subspace_service/
sync_from_dsn.rs

1pub(crate) mod import_blocks;
2pub(crate) mod piece_validator;
3pub(crate) mod segment_header_downloader;
4pub(crate) mod snap_sync;
5
6use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn;
7use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
8use async_trait::async_trait;
9use futures::channel::mpsc;
10use futures::{FutureExt, Stream, StreamExt, select};
11use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents};
12use sc_consensus::import_queue::ImportQueueService;
13use sc_consensus_subspace::archiver::SegmentHeadersStore;
14use sc_network::NetworkBlock;
15use sc_network::service::traits::NetworkService;
16use sp_api::ProvideRuntimeApi;
17use sp_blockchain::HeaderBackend;
18use sp_consensus_subspace::SubspaceApi;
19use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
20use std::fmt;
21use std::future::Future;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
24use std::time::{Duration, Instant};
25use subspace_core_primitives::PublicKey;
26use subspace_core_primitives::pieces::{Piece, PieceIndex};
27use subspace_core_primitives::segments::SegmentIndex;
28use subspace_data_retrieval::piece_getter::PieceGetter;
29use subspace_erasure_coding::ErasureCoding;
30use subspace_networking::Node;
31use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
32use tracing::{debug, info, warn};
33
34/// How much time to wait for new block to be imported before timing out and starting sync from DSN
35const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
36/// Frequency with which to check whether node is online or not
37const CHECK_ONLINE_STATUS_INTERVAL: Duration = Duration::from_secs(1);
38/// Frequency with which to check whether node is almost synced to the tip of the observed chain
39const CHECK_ALMOST_SYNCED_INTERVAL: Duration = Duration::from_secs(1);
40/// Period of time during which node should be offline for DSN sync to kick-in
41const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
42
43/// Wrapper type for [`PieceProvider`], so it can implement [`PieceGetter`]
44pub struct DsnPieceGetter<PV: PieceValidator>(PieceProvider<PV>);
45
46impl<PV> fmt::Debug for DsnPieceGetter<PV>
47where
48    PV: PieceValidator,
49{
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_tuple("DsnPieceGetter")
52            .field(&format!("{:?}", self.0))
53            .finish()
54    }
55}
56
57#[async_trait]
58impl<PV> PieceGetter for DsnPieceGetter<PV>
59where
60    PV: PieceValidator,
61{
62    #[inline]
63    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
64        Ok(self.0.get_piece_from_cache(piece_index).await)
65    }
66
67    #[inline]
68    async fn get_pieces<'a>(
69        &'a self,
70        piece_indices: Vec<PieceIndex>,
71    ) -> anyhow::Result<
72        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
73    > {
74        let stream = self
75            .0
76            .get_from_cache(piece_indices)
77            .await
78            .map(|(piece_index, maybe_piece)| (piece_index, Ok(maybe_piece)));
79        Ok(Box::new(stream))
80    }
81}
82
83impl<PV> DsnPieceGetter<PV>
84where
85    PV: PieceValidator,
86{
87    /// Creates new DSN piece getter
88    pub fn new(piece_provider: PieceProvider<PV>) -> Self {
89        Self(piece_provider)
90    }
91}
92
93#[derive(Debug)]
94enum NotificationReason {
95    NoImportedBlocks,
96    // TODO: Restore or remove connected peer later
97    #[allow(dead_code)]
98    WentOnlineSubspace,
99    WentOnlineSubstrate,
100}
101
102/// Create node observer that will track node state and send notifications to worker to start sync
103/// from DSN.
104#[allow(clippy::too_many_arguments)]
105pub(super) fn create_observer_and_worker<Block, AS, NB, Client, PG>(
106    segment_headers_store: SegmentHeadersStore<AS>,
107    network_service: Arc<dyn NetworkService>,
108    node: Node,
109    client: Arc<Client>,
110    mut import_queue_service: Box<dyn ImportQueueService<Block>>,
111    network_block: NB,
112    sync_target_block_number: Arc<AtomicU32>,
113    pause_sync: Arc<AtomicBool>,
114    piece_getter: PG,
115    erasure_coding: ErasureCoding,
116) -> (
117    impl Future<Output = ()> + Send + 'static,
118    impl Future<Output = Result<(), sc_service::Error>> + Send + 'static,
119)
120where
121    Block: BlockT,
122    AS: AuxStore + Send + Sync + 'static,
123    NB: NetworkBlock<Block::Hash, NumberFor<Block>> + Send + 'static,
124    Client: HeaderBackend<Block>
125        + BlockBackend<Block>
126        + BlockchainEvents<Block>
127        + ProvideRuntimeApi<Block>
128        + Send
129        + Sync
130        + 'static,
131    Client::Api: SubspaceApi<Block, PublicKey>,
132    PG: PieceGetter + Send + Sync + 'static,
133{
134    let (tx, rx) = mpsc::channel(0);
135    let observer_fut = {
136        let node = node.clone();
137        let client = Arc::clone(&client);
138
139        async move { create_observer(network_service.as_ref(), &node, client.as_ref(), tx).await }
140    };
141    let worker_fut = async move {
142        create_worker(
143            segment_headers_store,
144            &node,
145            client.as_ref(),
146            import_queue_service.as_mut(),
147            network_block,
148            sync_target_block_number,
149            pause_sync,
150            rx,
151            &piece_getter,
152            &erasure_coding,
153        )
154        .await
155    };
156    (observer_fut, worker_fut)
157}
158
159async fn create_observer<Block, Client>(
160    network_service: &dyn NetworkService,
161    _node: &Node,
162    client: &Client,
163    notifications_sender: mpsc::Sender<NotificationReason>,
164) where
165    Block: BlockT,
166    Client: BlockchainEvents<Block> + Send + Sync + 'static,
167{
168    // // Separate reactive observer for Subspace networking that is not a future
169    // let _handler_id = node.on_num_established_peer_connections_change({
170    //     // Assuming node is offline by default
171    //     let last_online = Atomic::new(None::<Instant>);
172    //     let notifications_sender = notifications_sender.clone();
173    //
174    //     Arc::new(move |&new_connections| {
175    //         let is_online = new_connections > 0;
176    //         let was_online = last_online
177    //             .load(Ordering::AcqRel)
178    //             .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
179    //             .unwrap_or_default();
180    //
181    //         if is_online && !was_online {
182    //             // Doesn't matter if sending failed here
183    //             let _ = notifications_sender
184    //                 .clone()
185    //                 .try_send(NotificationReason::WentOnlineSubspace);
186    //         }
187    //
188    //         if is_online {
189    //             last_online.store(Some(Instant::now()), Ordering::Release);
190    //         }
191    //     })
192    // });
193    select! {
194        _ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
195            // Runs indefinitely
196        }
197        _ = create_substrate_network_observer(network_service, notifications_sender).fuse() => {
198            // Runs indefinitely
199        }
200    }
201}
202
203async fn create_imported_blocks_observer<Block, Client>(
204    client: &Client,
205    mut notifications_sender: mpsc::Sender<NotificationReason>,
206) where
207    Block: BlockT,
208    Client: BlockchainEvents<Block> + Send + Sync + 'static,
209{
210    let mut import_notification_stream = client.every_import_notification_stream();
211    loop {
212        match tokio::time::timeout(
213            NO_IMPORTED_BLOCKS_TIMEOUT,
214            import_notification_stream.next(),
215        )
216        .await
217        {
218            Ok(Some(_notification)) => {
219                // Do nothing
220            }
221            Ok(None) => {
222                // No more notifications
223                return;
224            }
225            Err(_timeout) => {
226                if let Err(error) =
227                    notifications_sender.try_send(NotificationReason::NoImportedBlocks)
228                    && error.is_disconnected()
229                {
230                    // Receiving side was closed
231                    return;
232                }
233            }
234        }
235    }
236}
237
238async fn create_substrate_network_observer(
239    network_service: &dyn NetworkService,
240    mut notifications_sender: mpsc::Sender<NotificationReason>,
241) {
242    // Assuming node is offline by default
243    let mut last_online = None::<Instant>;
244
245    loop {
246        tokio::time::sleep(CHECK_ONLINE_STATUS_INTERVAL).await;
247
248        let is_online = network_service.sync_num_connected() > 0;
249
250        let was_online = last_online
251            .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
252            .unwrap_or_default();
253        if is_online
254            && !was_online
255            && let Err(error) =
256                notifications_sender.try_send(NotificationReason::WentOnlineSubstrate)
257            && error.is_disconnected()
258        {
259            // Receiving side was closed
260            return;
261        }
262
263        if is_online {
264            last_online.replace(Instant::now());
265        }
266    }
267}
268
269#[allow(clippy::too_many_arguments)]
270async fn create_worker<Block, AS, IQS, NB, Client, PG>(
271    segment_headers_store: SegmentHeadersStore<AS>,
272    node: &Node,
273    client: &Client,
274    import_queue_service: &mut IQS,
275    network_block: NB,
276    sync_target_block_number: Arc<AtomicU32>,
277    pause_sync: Arc<AtomicBool>,
278    mut notifications: mpsc::Receiver<NotificationReason>,
279    piece_getter: &PG,
280    erasure_coding: &ErasureCoding,
281) -> Result<(), sc_service::Error>
282where
283    Block: BlockT,
284    AS: AuxStore + Send + Sync + 'static,
285    IQS: ImportQueueService<Block> + ?Sized,
286    NB: NetworkBlock<Block::Hash, NumberFor<Block>>,
287    Client: HeaderBackend<Block>
288        + BlockBackend<Block>
289        + ProvideRuntimeApi<Block>
290        + Send
291        + Sync
292        + 'static,
293    Client::Api: SubspaceApi<Block, PublicKey>,
294    PG: PieceGetter,
295{
296    let info = client.info();
297    let chain_constants = client
298        .runtime_api()
299        .chain_constants(info.best_hash)
300        .map_err(|error| error.to_string())?;
301
302    // This is the last segment index that has been fully processed by DSN sync.
303    // If a segment has a partial block at the end, it is not fully processed until that block is
304    // processed.
305    //
306    // Segment zero corresponds to contents of the genesis block, everyone has it, so we consider it as
307    // processed right away.
308    let mut last_completed_segment_index = SegmentIndex::ZERO;
309
310    // This is the last block number that has been queued for import by DSN sync.
311    // (Or we've checked for its header and it has already been imported.)
312    //
313    // TODO: We'll be able to just take finalized block once we are able to decouple pruning from
314    //  finality: https://github.com/paritytech/polkadot-sdk/issues/1570
315    let mut last_processed_block_number = info.best_number;
316    let segment_header_downloader = SegmentHeaderDownloader::new(node);
317
318    while let Some(reason) = notifications.next().await {
319        info!(
320            ?reason,
321            ?last_completed_segment_index,
322            ?last_processed_block_number,
323            "Received notification to sync from DSN, deactivating substrate sync"
324        );
325        pause_sync.store(true, Ordering::Release);
326
327        // TODO: Maybe handle failed block imports, additional helpful logging
328        let import_blocks_from_dsn_fut = import_blocks_from_dsn(
329            &segment_headers_store,
330            &segment_header_downloader,
331            client,
332            piece_getter,
333            import_queue_service,
334            &mut last_completed_segment_index,
335            &mut last_processed_block_number,
336            erasure_coding,
337        );
338        let wait_almost_synced_fut = async {
339            loop {
340                tokio::time::sleep(CHECK_ALMOST_SYNCED_INTERVAL).await;
341
342                let info = client.info();
343                let target_block_number =
344                    NumberFor::<Block>::from(sync_target_block_number.load(Ordering::Relaxed));
345
346                // If less blocks than confirmation depth to the tip of the chain, no need to worry about DSN sync
347                // anymore, it will not be helpful anyway
348                if target_block_number
349                    .checked_sub(&info.best_number)
350                    .map(|diff| diff < chain_constants.confirmation_depth_k().into())
351                    .unwrap_or_default()
352                {
353                    debug!(
354                        best_block = ?info.best_number,
355                        ?target_block_number,
356                        "Node is almost synced, stopping DSN sync until the next notification"
357                    );
358                    break;
359                }
360            }
361        };
362
363        select! {
364            result = import_blocks_from_dsn_fut.fuse() => {
365                if let Err(error) = result {
366                    warn!(
367                        %error,
368                        ?last_completed_segment_index,
369                        ?last_processed_block_number,
370                        "Error when syncing blocks from DSN, stopping DSN sync until the next notification"
371                    );
372                }
373            }
374            _ = wait_almost_synced_fut.fuse() => {
375                // Almost synced, DSN sync can't possibly help here
376            }
377        }
378
379        while notifications.try_next().is_ok() {
380            // Just drain extra messages if there are any
381        }
382
383        // Notify Substrate's sync mechanism to allow regular Substrate sync to continue
384        // gracefully. We do this at the end of the loop, to minimise race conditions which can
385        // hide DSN sync bugs.
386        debug!(
387            ?last_completed_segment_index,
388            ?last_processed_block_number,
389            "Finished DSN sync, activating substrate sync"
390        );
391        {
392            let info = client.info();
393            network_block.new_best_block_imported(info.best_hash, info.best_number);
394        }
395        pause_sync.store(false, Ordering::Release);
396    }
397
398    Ok(())
399}