subspace_service/mmr/
sync.rs

1use crate::mmr::get_offchain_key;
2use crate::mmr::request_handler::{MAX_MMR_ITEMS, MmrRequest, MmrResponse, generate_protocol_name};
3use futures::channel::oneshot;
4use parity_scale_codec::{Decode, Encode};
5use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
6use sc_network_sync::SyncingService;
7use sp_api::ProvideRuntimeApi;
8use sp_blockchain::HeaderBackend;
9use sp_core::offchain::storage::OffchainDb;
10use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
11use sp_core::{H256, Hasher};
12use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps};
13use sp_mmr_primitives::utils::NodesUtils;
14use sp_mmr_primitives::{DataOrHash, MmrApi, mmr_lib};
15use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor};
16use sp_subspace_mmr::MmrLeaf;
17use std::cell::RefCell;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::time::Duration;
21use subspace_core_primitives::{BlockHash, BlockNumber};
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25type Node<H, L> = DataOrHash<H, L>;
26type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
27type NodeOf = Node<Keccak256, MmrLeafOf>;
28type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;
29
30pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
31    let node = match NodeOf::decode(&mut data) {
32        Ok(node) => node,
33        Err(err) => {
34            error!(?err, "Can't decode MMR data");
35
36            return Err(mmr_lib::Error::StoreError(
37                "Can't decode MMR data".to_string(),
38            ));
39        }
40    };
41
42    Ok(node)
43}
44
45struct OffchainMmrStorage<OS: OffchainStorage> {
46    offchain_db: RefCell<OffchainDb<OS>>,
47}
48
49impl<OS: OffchainStorage> OffchainMmrStorage<OS> {
50    fn new(offchain_storage: OS) -> Self {
51        let offchain_db = OffchainDb::new(offchain_storage);
52
53        Self {
54            offchain_db: RefCell::new(offchain_db),
55        }
56    }
57}
58
59impl<OS: OffchainStorage> MMRStoreReadOps<NodeOf> for OffchainMmrStorage<OS> {
60    fn get_elem(&self, pos: u64) -> mmr_lib::Result<Option<NodeOf>> {
61        let canon_key = get_offchain_key(pos);
62        let Some(data) = self
63            .offchain_db
64            .borrow_mut()
65            .local_storage_get(StorageKind::PERSISTENT, &canon_key)
66        else {
67            error!( %pos, "Can't get MMR data.");
68            return Ok(None);
69        };
70
71        let node = decode_mmr_data(data.as_slice());
72
73        node.map(Some)
74    }
75}
76
77impl<OS: OffchainStorage> MMRStoreWriteOps<NodeOf> for OffchainMmrStorage<OS> {
78    fn append(&mut self, pos: u64, elems: Vec<NodeOf>) -> mmr_lib::Result<()> {
79        let mut current_pos = pos;
80        for elem in elems {
81            let data = elem.encode();
82
83            let canon_key = get_offchain_key(current_pos);
84            self.offchain_db.borrow_mut().local_storage_set(
85                StorageKind::PERSISTENT,
86                &canon_key,
87                &data,
88            );
89
90            current_pos += 1;
91        }
92
93        Ok(())
94    }
95}
96
97/// Default Merging & Hashing behavior for MMR.
98pub struct MmrHasher;
99
100impl mmr_lib::Merge for MmrHasher {
101    type Item = NodeOf;
102
103    fn merge(left: &Self::Item, right: &Self::Item) -> mmr_lib::Result<Self::Item> {
104        let mut concat = left.hash().as_ref().to_vec();
105        concat.extend_from_slice(right.hash().as_ref());
106
107        Ok(Node::Hash(Keccak256::hash(&concat)))
108    }
109}
110
111const SYNC_PAUSE: Duration = Duration::from_secs(5);
112
113/// Synchronize and verification MMR-leafs from remote offchain storage of the synced peer.
114pub(crate) struct MmrSync<Client, Block, OS: OffchainStorage> {
115    client: Arc<Client>,
116    mmr: MmrOf<OS>,
117    leaves_number: u32,
118    _data: PhantomData<Block>,
119}
120
121impl<Client, Block, OS> MmrSync<Client, Block, OS>
122where
123    Block: BlockT,
124    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
125    Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
126    OS: OffchainStorage,
127{
128    pub fn new(client: Arc<Client>, offchain_storage: OS) -> Self {
129        Self {
130            client,
131            mmr: MmrOf::new(0, OffchainMmrStorage::new(offchain_storage)),
132            leaves_number: 0,
133            _data: Default::default(),
134        }
135    }
136
137    // TODO: Add support for MMR-sync reruns from non-zero starting point.
138    /// Synchronize MMR-leafs from remote offchain storage of the synced peer.
139    pub(crate) async fn sync<NR>(
140        &mut self,
141        fork_id: Option<String>,
142        network_service: NR,
143        sync_service: Arc<SyncingService<Block>>,
144        target_block: BlockNumber,
145    ) -> Result<(), sp_blockchain::Error>
146    where
147        NR: NetworkRequest,
148    {
149        debug!("MMR sync started.");
150        let info = self.client.info();
151        let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref());
152
153        let mut leaves_number = 0u32;
154        let mut starting_position = 0;
155
156        'outer: loop {
157            let peers_info = match sync_service.peers_info().await {
158                Ok(peers_info) => peers_info,
159                Err(error) => {
160                    error!("Peers info request returned an error: {error}",);
161                    sleep(SYNC_PAUSE).await;
162                    continue;
163                }
164            };
165
166            //  Enumerate peers until we find a suitable source for MMR
167            'peers: for (peer_id, peer_info) in peers_info.iter() {
168                trace!("MMR sync. peer = {peer_id}, info = {:?}", peer_info);
169
170                if !peer_info.is_synced {
171                    trace!("MMR sync skipped (not synced). peer = {peer_id}");
172
173                    continue;
174                }
175
176                // Request MMR until target block reached.
177                loop {
178                    let target_position = {
179                        let nodes = NodesUtils::new(target_block.into());
180
181                        let target_position = nodes.size().saturating_sub(1);
182
183                        debug!(
184                            "MMR-sync. Target block={}, Node target position={}",
185                            target_block, target_position
186                        );
187
188                        target_position
189                    };
190
191                    let request = MmrRequest {
192                        starting_position,
193                        limit: MAX_MMR_ITEMS,
194                    };
195                    let response = send_mmr_request(
196                        protocol_name.clone(),
197                        *peer_id,
198                        request,
199                        &network_service,
200                    )
201                    .await;
202
203                    match response {
204                        Ok(response) => {
205                            trace!("Response: {:?}", response.mmr_data.len());
206
207                            if response.mmr_data.is_empty() {
208                                debug!("Empty response from peer={}", peer_id);
209                                break;
210                            }
211
212                            // Save the MMR-nodes from response to the local storage
213                            'data: for (position, data) in response.mmr_data.iter() {
214                                // Ensure continuous sync
215                                if *position != starting_position {
216                                    debug!(
217                                        ?peer_info,
218                                        %starting_position,
219                                        %position,
220                                        "MMR sync error: incorrect starting position."
221                                    );
222
223                                    continue 'peers;
224                                }
225
226                                let node = decode_mmr_data(data);
227
228                                let node = match node {
229                                    Ok(node) => node,
230                                    Err(err) => {
231                                        debug!( ?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
232
233                                        continue 'peers;
234                                    }
235                                };
236
237                                if matches!(node, Node::Data(_)) {
238                                    if let Err(err) = self.mmr.push(node) {
239                                        debug!( ?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
240
241                                        return Err(sp_blockchain::Error::Backend(
242                                            "Can't add MMR data to the MMR storage".to_string(),
243                                        ));
244                                    }
245
246                                    leaves_number += 1;
247                                }
248
249                                starting_position += 1;
250
251                                // Did we collect all the necessary data from the last response?
252                                if u64::from(*position) >= target_position {
253                                    debug!( %target_position, "MMR-sync: target position reached.");
254                                    break 'data;
255                                }
256                            }
257                        }
258                        Err(error) => {
259                            debug!("MMR sync request failed. peer = {peer_id}: {error}");
260
261                            continue 'peers;
262                        }
263                    }
264
265                    // Should we request a new portion of the data from the last peer?
266                    if target_position <= starting_position.into() {
267                        if let Err(err) = self.mmr.commit() {
268                            error!(?err, "MMR commit failed.");
269
270                            return Err(sp_blockchain::Error::Application(
271                                "Failed to commit MMR data.".into(),
272                            ));
273                        }
274
275                        // Actual MMR-nodes may exceed this number, however, we will catch up with the rest
276                        // when we sync the remaining data (consensus and domain chains).
277                        debug!("Target position reached: {target_position}");
278
279                        break 'outer;
280                    }
281                }
282            }
283            debug!( %starting_position, "No synced peers to handle the MMR-sync. Pausing...",);
284            sleep(SYNC_PAUSE).await;
285        }
286
287        debug!("MMR sync finished.");
288        self.leaves_number = leaves_number;
289        Ok(())
290    }
291
292    pub(crate) fn verify_mmr_data(&self) -> Result<(), sp_blockchain::Error> {
293        debug!("Verifying MMR data...");
294
295        let block_number = self.leaves_number;
296        let Some(hash) = self.client.hash(block_number.into())? else {
297            error!( %block_number, "MMR data verification: error during hash acquisition");
298            return Err(sp_blockchain::Error::UnknownBlock(
299                "Failed to get Block hash for Number: {block_number:?}".to_string(),
300            ));
301        };
302
303        let mmr_root = self.mmr.get_root();
304        trace!("MMR root: {:?}", mmr_root);
305        let api_root = self.client.runtime_api().mmr_root(hash);
306        trace!("API root: {:?}", api_root);
307
308        let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else {
309            error!( %block_number, ?mmr_root, "Can't get MMR root from local storage.");
310            return Err(sp_blockchain::Error::Application(
311                "Can't get MMR root from local storage".into(),
312            ));
313        };
314
315        let Ok(Ok(api_root_hash)) = api_root else {
316            error!( %block_number, ?mmr_root, "Can't get MMR root from API.");
317            return Err(sp_blockchain::Error::Application(
318                "Can't get MMR root from API.".into(),
319            ));
320        };
321
322        if api_root_hash != mmr_root_hash {
323            error!(?api_root_hash, ?mmr_root_hash, "MMR data hashes differ.");
324            return Err(sp_blockchain::Error::Application(
325                "MMR data hashes differ.".into(),
326            ));
327        }
328
329        debug!("MMR data verified");
330
331        Ok(())
332    }
333}
334
335/// MMR-sync error
336#[derive(Debug, thiserror::Error)]
337pub enum MmrResponseError {
338    #[error("MMR request failed: {0}")]
339    RequestFailed(#[from] RequestFailure),
340
341    #[error("MMR request canceled")]
342    RequestCanceled,
343
344    #[error("MMR request failed: invalid protocol")]
345    InvalidProtocol,
346
347    #[error("Failed to decode response: {0}")]
348    DecodeFailed(String),
349}
350
351async fn send_mmr_request<NR: NetworkRequest>(
352    protocol_name: String,
353    peer_id: PeerId,
354    request: MmrRequest,
355    network_service: &NR,
356) -> Result<MmrResponse, MmrResponseError> {
357    let (tx, rx) = oneshot::channel();
358
359    debug!("Sending request: {request:?}  (peer={peer_id})");
360
361    let encoded_request = request.encode();
362
363    network_service.start_request(
364        peer_id,
365        protocol_name.clone().into(),
366        encoded_request,
367        None,
368        tx,
369        IfDisconnected::ImmediateError,
370    );
371
372    let result = rx.await.map_err(|_| MmrResponseError::RequestCanceled)?;
373
374    match result {
375        Ok((data, response_protocol_name)) => {
376            if response_protocol_name != protocol_name.into() {
377                return Err(MmrResponseError::InvalidProtocol);
378            }
379
380            let response = decode_mmr_response(&data).map_err(MmrResponseError::DecodeFailed)?;
381
382            Ok(response)
383        }
384        Err(error) => Err(error.into()),
385    }
386}
387
388fn decode_mmr_response(mut response: &[u8]) -> Result<MmrResponse, String> {
389    let response = MmrResponse::decode(&mut response)
390        .map_err(|error| format!("Failed to decode state response: {error}"))?;
391
392    Ok(response)
393}