subspace_service/mmr/
sync.rs

1use crate::mmr::get_offchain_key;
2use crate::mmr::request_handler::{generate_protocol_name, MmrRequest, MmrResponse, MAX_MMR_ITEMS};
3use crate::sync_from_dsn::LOG_TARGET;
4use futures::channel::oneshot;
5use parity_scale_codec::{Decode, Encode};
6use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
7use sc_network_sync::SyncingService;
8use sp_api::ProvideRuntimeApi;
9use sp_blockchain::HeaderBackend;
10use sp_core::offchain::storage::OffchainDb;
11use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
12use sp_core::{Hasher, H256};
13use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps};
14use sp_mmr_primitives::utils::NodesUtils;
15use sp_mmr_primitives::{mmr_lib, DataOrHash, MmrApi};
16use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor};
17use sp_subspace_mmr::MmrLeaf;
18use std::cell::RefCell;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use subspace_core_primitives::{BlockHash, BlockNumber};
23use tokio::time::sleep;
24use tracing::{debug, error, trace};
25
26type Node<H, L> = DataOrHash<H, L>;
27type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
28type NodeOf = Node<Keccak256, MmrLeafOf>;
29type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;
30
31pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
32    let node = match NodeOf::decode(&mut data) {
33        Ok(node) => node,
34        Err(err) => {
35            error!(target: LOG_TARGET, ?err, "Can't decode MMR data");
36
37            return Err(mmr_lib::Error::StoreError(
38                "Can't decode MMR data".to_string(),
39            ));
40        }
41    };
42
43    Ok(node)
44}
45
46struct OffchainMmrStorage<OS: OffchainStorage> {
47    offchain_db: RefCell<OffchainDb<OS>>,
48}
49
50impl<OS: OffchainStorage> OffchainMmrStorage<OS> {
51    fn new(offchain_storage: OS) -> Self {
52        let offchain_db = OffchainDb::new(offchain_storage);
53
54        Self {
55            offchain_db: RefCell::new(offchain_db),
56        }
57    }
58}
59
60impl<OS: OffchainStorage> MMRStoreReadOps<NodeOf> for OffchainMmrStorage<OS> {
61    fn get_elem(&self, pos: u64) -> mmr_lib::Result<Option<NodeOf>> {
62        let canon_key = get_offchain_key(pos);
63        let Some(data) = self
64            .offchain_db
65            .borrow_mut()
66            .local_storage_get(StorageKind::PERSISTENT, &canon_key)
67        else {
68            error!(target: LOG_TARGET, %pos, "Can't get MMR data.");
69
70            return Ok(None);
71        };
72
73        let node = decode_mmr_data(data.as_slice());
74
75        node.map(Some)
76    }
77}
78
79impl<OS: OffchainStorage> MMRStoreWriteOps<NodeOf> for OffchainMmrStorage<OS> {
80    fn append(&mut self, pos: u64, elems: Vec<NodeOf>) -> mmr_lib::Result<()> {
81        let mut current_pos = pos;
82        for elem in elems {
83            let data = elem.encode();
84
85            let canon_key = get_offchain_key(current_pos);
86            self.offchain_db.borrow_mut().local_storage_set(
87                StorageKind::PERSISTENT,
88                &canon_key,
89                &data,
90            );
91
92            current_pos += 1;
93        }
94
95        Ok(())
96    }
97}
98
99/// Default Merging & Hashing behavior for MMR.
100pub struct MmrHasher;
101
102impl mmr_lib::Merge for MmrHasher {
103    type Item = NodeOf;
104
105    fn merge(left: &Self::Item, right: &Self::Item) -> mmr_lib::Result<Self::Item> {
106        let mut concat = left.hash().as_ref().to_vec();
107        concat.extend_from_slice(right.hash().as_ref());
108
109        Ok(Node::Hash(Keccak256::hash(&concat)))
110    }
111}
112
113const SYNC_PAUSE: Duration = Duration::from_secs(5);
114
115/// Synchronize and verification MMR-leafs from remote offchain storage of the synced peer.
116pub(crate) struct MmrSync<Client, Block, OS: OffchainStorage> {
117    client: Arc<Client>,
118    mmr: MmrOf<OS>,
119    leaves_number: u32,
120    _data: PhantomData<Block>,
121}
122
123impl<Client, Block, OS> MmrSync<Client, Block, OS>
124where
125    Block: BlockT,
126    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
127    Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
128    OS: OffchainStorage,
129{
130    pub fn new(client: Arc<Client>, offchain_storage: OS) -> Self {
131        Self {
132            client,
133            mmr: MmrOf::new(0, OffchainMmrStorage::new(offchain_storage)),
134            leaves_number: 0,
135            _data: Default::default(),
136        }
137    }
138
139    // TODO: Add support for MMR-sync reruns from non-zero starting point.
140    /// Synchronize MMR-leafs from remote offchain storage of the synced peer.
141    pub(crate) async fn sync<NR>(
142        &mut self,
143        fork_id: Option<String>,
144        network_service: NR,
145        sync_service: Arc<SyncingService<Block>>,
146        target_block: BlockNumber,
147    ) -> Result<(), sp_blockchain::Error>
148    where
149        NR: NetworkRequest,
150    {
151        debug!(target: LOG_TARGET, "MMR sync started.");
152        let info = self.client.info();
153        let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref());
154
155        let mut leaves_number = 0u32;
156        let mut starting_position = 0;
157
158        'outer: loop {
159            let peers_info = match sync_service.peers_info().await {
160                Ok(peers_info) => peers_info,
161                Err(error) => {
162                    error!(target: LOG_TARGET, "Peers info request returned an error: {error}",);
163                    sleep(SYNC_PAUSE).await;
164                    continue;
165                }
166            };
167
168            //  Enumerate peers until we find a suitable source for MMR
169            'peers: for (peer_id, peer_info) in peers_info.iter() {
170                trace!(target: LOG_TARGET, "MMR sync. peer = {peer_id}, info = {:?}", peer_info);
171
172                if !peer_info.is_synced {
173                    trace!(target: LOG_TARGET, "MMR sync skipped (not synced). peer = {peer_id}");
174
175                    continue;
176                }
177
178                // Request MMR until target block reached.
179                loop {
180                    let target_position = {
181                        let nodes = NodesUtils::new(target_block.into());
182
183                        let target_position = nodes.size().saturating_sub(1);
184
185                        debug!(
186                            target: LOG_TARGET,
187                            "MMR-sync. Target block={}, Node target position={}",
188                            target_block, target_position
189                        );
190
191                        target_position
192                    };
193
194                    let request = MmrRequest {
195                        starting_position,
196                        limit: MAX_MMR_ITEMS,
197                    };
198                    let response = send_mmr_request(
199                        protocol_name.clone(),
200                        *peer_id,
201                        request,
202                        &network_service,
203                    )
204                    .await;
205
206                    match response {
207                        Ok(response) => {
208                            trace!(target: LOG_TARGET, "Response: {:?}", response.mmr_data.len());
209
210                            if response.mmr_data.is_empty() {
211                                debug!(target: LOG_TARGET, "Empty response from peer={}", peer_id);
212                                break;
213                            }
214
215                            // Save the MMR-nodes from response to the local storage
216                            'data: for (position, data) in response.mmr_data.iter() {
217                                // Ensure continuous sync
218                                if *position != starting_position {
219                                    debug!(
220                                        target: LOG_TARGET,
221                                        ?peer_info,
222                                        %starting_position,
223                                        %position,
224                                        "MMR sync error: incorrect starting position."
225                                    );
226
227                                    continue 'peers;
228                                }
229
230                                let node = decode_mmr_data(data);
231
232                                let node = match node {
233                                    Ok(node) => node,
234                                    Err(err) => {
235                                        debug!(target: LOG_TARGET, ?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
236
237                                        continue 'peers;
238                                    }
239                                };
240
241                                if matches!(node, Node::Data(_)) {
242                                    if let Err(err) = self.mmr.push(node) {
243                                        debug!(target: LOG_TARGET, ?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
244
245                                        return Err(sp_blockchain::Error::Backend(
246                                            "Can't add MMR data to the MMR storage".to_string(),
247                                        ));
248                                    }
249
250                                    leaves_number += 1;
251                                }
252
253                                starting_position += 1;
254
255                                // Did we collect all the necessary data from the last response?
256                                if u64::from(*position) >= target_position {
257                                    debug!(target: LOG_TARGET, %target_position, "MMR-sync: target position reached.");
258                                    break 'data;
259                                }
260                            }
261                        }
262                        Err(error) => {
263                            debug!(target: LOG_TARGET, "MMR sync request failed. peer = {peer_id}: {error}");
264
265                            continue 'peers;
266                        }
267                    }
268
269                    // Should we request a new portion of the data from the last peer?
270                    if target_position <= starting_position.into() {
271                        if let Err(err) = self.mmr.commit() {
272                            error!(target: LOG_TARGET, ?err, "MMR commit failed.");
273
274                            return Err(sp_blockchain::Error::Application(
275                                "Failed to commit MMR data.".into(),
276                            ));
277                        }
278
279                        // Actual MMR-nodes may exceed this number, however, we will catch up with the rest
280                        // when we sync the remaining data (consensus and domain chains).
281                        debug!(target: LOG_TARGET, "Target position reached: {target_position}");
282
283                        break 'outer;
284                    }
285                }
286            }
287            debug!(target: LOG_TARGET, %starting_position, "No synced peers to handle the MMR-sync. Pausing...",);
288            sleep(SYNC_PAUSE).await;
289        }
290
291        debug!(target: LOG_TARGET, "MMR sync finished.");
292        self.leaves_number = leaves_number;
293        Ok(())
294    }
295
296    pub(crate) fn verify_mmr_data(&self) -> Result<(), sp_blockchain::Error> {
297        debug!(target: LOG_TARGET, "Verifying MMR data...");
298
299        let block_number = self.leaves_number;
300        let Some(hash) = self.client.hash(block_number.into())? else {
301            error!(target: LOG_TARGET, %block_number, "MMR data verification: error during hash acquisition");
302            return Err(sp_blockchain::Error::UnknownBlock(
303                "Failed to get Block hash for Number: {block_number:?}".to_string(),
304            ));
305        };
306
307        let mmr_root = self.mmr.get_root();
308        trace!(target: LOG_TARGET, "MMR root: {:?}", mmr_root);
309        let api_root = self.client.runtime_api().mmr_root(hash);
310        trace!(target: LOG_TARGET, "API root: {:?}", api_root);
311
312        let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else {
313            error!(target: LOG_TARGET, %block_number, ?mmr_root, "Can't get MMR root from local storage.");
314            return Err(sp_blockchain::Error::Application(
315                "Can't get MMR root from local storage".into(),
316            ));
317        };
318
319        let Ok(Ok(api_root_hash)) = api_root else {
320            error!(target: LOG_TARGET, %block_number, ?mmr_root, "Can't get MMR root from API.");
321            return Err(sp_blockchain::Error::Application(
322                "Can't get MMR root from API.".into(),
323            ));
324        };
325
326        if api_root_hash != mmr_root_hash {
327            error!(target: LOG_TARGET, ?api_root_hash, ?mmr_root_hash, "MMR data hashes differ.");
328            return Err(sp_blockchain::Error::Application(
329                "MMR data hashes differ.".into(),
330            ));
331        }
332
333        debug!(target: LOG_TARGET, "MMR data verified");
334
335        Ok(())
336    }
337}
338
339/// MMR-sync error
340#[derive(Debug, thiserror::Error)]
341pub enum MmrResponseError {
342    #[error("MMR request failed: {0}")]
343    RequestFailed(#[from] RequestFailure),
344
345    #[error("MMR request canceled")]
346    RequestCanceled,
347
348    #[error("MMR request failed: invalid protocol")]
349    InvalidProtocol,
350
351    #[error("Failed to decode response: {0}")]
352    DecodeFailed(String),
353}
354
355async fn send_mmr_request<NR: NetworkRequest>(
356    protocol_name: String,
357    peer_id: PeerId,
358    request: MmrRequest,
359    network_service: &NR,
360) -> Result<MmrResponse, MmrResponseError> {
361    let (tx, rx) = oneshot::channel();
362
363    debug!(target: LOG_TARGET, "Sending request: {request:?}  (peer={peer_id})");
364
365    let encoded_request = request.encode();
366
367    network_service.start_request(
368        peer_id,
369        protocol_name.clone().into(),
370        encoded_request,
371        None,
372        tx,
373        IfDisconnected::ImmediateError,
374    );
375
376    let result = rx.await.map_err(|_| MmrResponseError::RequestCanceled)?;
377
378    match result {
379        Ok((data, response_protocol_name)) => {
380            if response_protocol_name != protocol_name.into() {
381                return Err(MmrResponseError::InvalidProtocol);
382            }
383
384            let response = decode_mmr_response(&data).map_err(MmrResponseError::DecodeFailed)?;
385
386            Ok(response)
387        }
388        Err(error) => Err(error.into()),
389    }
390}
391
392fn decode_mmr_response(mut response: &[u8]) -> Result<MmrResponse, String> {
393    let response = MmrResponse::decode(&mut response)
394        .map_err(|error| format!("Failed to decode state response: {error}"))?;
395
396    Ok(response)
397}