subspace_service/
sync_from_dsn.rs

1pub(crate) mod import_blocks;
2pub(crate) mod piece_validator;
3pub(crate) mod segment_header_downloader;
4pub(crate) mod snap_sync;
5
6use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn;
7use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
8use async_trait::async_trait;
9use futures::channel::mpsc;
10use futures::{select, FutureExt, Stream, StreamExt};
11use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents};
12use sc_consensus::import_queue::ImportQueueService;
13use sc_consensus_subspace::archiver::SegmentHeadersStore;
14use sc_network::service::traits::NetworkService;
15use sc_network::NetworkBlock;
16use sp_api::ProvideRuntimeApi;
17use sp_blockchain::HeaderBackend;
18use sp_consensus_subspace::SubspaceApi;
19use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
20use std::fmt;
21use std::future::Future;
22use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use subspace_core_primitives::segments::SegmentIndex;
27use subspace_core_primitives::PublicKey;
28use subspace_data_retrieval::piece_getter::PieceGetter;
29use subspace_erasure_coding::ErasureCoding;
30use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
31use subspace_networking::Node;
32use tracing::{debug, info, warn};
33
34/// How much time to wait for new block to be imported before timing out and starting sync from DSN
35const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
36/// Frequency with which to check whether node is online or not
37const CHECK_ONLINE_STATUS_INTERVAL: Duration = Duration::from_secs(1);
38/// Frequency with which to check whether node is almost synced to the tip of the observed chain
39const CHECK_ALMOST_SYNCED_INTERVAL: Duration = Duration::from_secs(1);
40/// Period of time during which node should be offline for DSN sync to kick-in
41const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
42
43pub(crate) const LOG_TARGET: &str = "consensus_sync";
44
45/// Wrapper type for [`PieceProvider`], so it can implement [`PieceGetter`]
46pub struct DsnPieceGetter<PV: PieceValidator>(PieceProvider<PV>);
47
48impl<PV> fmt::Debug for DsnPieceGetter<PV>
49where
50    PV: PieceValidator,
51{
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.debug_tuple("DsnPieceGetter")
54            .field(&format!("{:?}", self.0))
55            .finish()
56    }
57}
58
59#[async_trait]
60impl<PV> PieceGetter for DsnPieceGetter<PV>
61where
62    PV: PieceValidator,
63{
64    #[inline]
65    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
66        Ok(self.0.get_piece_from_cache(piece_index).await)
67    }
68
69    #[inline]
70    async fn get_pieces<'a>(
71        &'a self,
72        piece_indices: Vec<PieceIndex>,
73    ) -> anyhow::Result<
74        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
75    > {
76        let stream = self
77            .0
78            .get_from_cache(piece_indices)
79            .await
80            .map(|(piece_index, maybe_piece)| (piece_index, Ok(maybe_piece)));
81        Ok(Box::new(stream))
82    }
83}
84
85impl<PV> DsnPieceGetter<PV>
86where
87    PV: PieceValidator,
88{
89    /// Creates new DSN piece getter
90    pub fn new(piece_provider: PieceProvider<PV>) -> Self {
91        Self(piece_provider)
92    }
93}
94
95#[derive(Debug)]
96enum NotificationReason {
97    NoImportedBlocks,
98    // TODO: Restore or remove connected peer later
99    #[allow(dead_code)]
100    WentOnlineSubspace,
101    WentOnlineSubstrate,
102}
103
104/// Create node observer that will track node state and send notifications to worker to start sync
105/// from DSN.
106#[allow(clippy::too_many_arguments)]
107pub(super) fn create_observer_and_worker<Block, AS, NB, Client, PG>(
108    segment_headers_store: SegmentHeadersStore<AS>,
109    network_service: Arc<dyn NetworkService>,
110    node: Node,
111    client: Arc<Client>,
112    mut import_queue_service: Box<dyn ImportQueueService<Block>>,
113    network_block: NB,
114    sync_target_block_number: Arc<AtomicU32>,
115    pause_sync: Arc<AtomicBool>,
116    piece_getter: PG,
117    erasure_coding: ErasureCoding,
118) -> (
119    impl Future<Output = ()> + Send + 'static,
120    impl Future<Output = Result<(), sc_service::Error>> + Send + 'static,
121)
122where
123    Block: BlockT,
124    AS: AuxStore + Send + Sync + 'static,
125    NB: NetworkBlock<Block::Hash, NumberFor<Block>> + Send + 'static,
126    Client: HeaderBackend<Block>
127        + BlockBackend<Block>
128        + BlockchainEvents<Block>
129        + ProvideRuntimeApi<Block>
130        + Send
131        + Sync
132        + 'static,
133    Client::Api: SubspaceApi<Block, PublicKey>,
134    PG: PieceGetter + Send + Sync + 'static,
135{
136    let (tx, rx) = mpsc::channel(0);
137    let observer_fut = {
138        let node = node.clone();
139        let client = Arc::clone(&client);
140
141        async move { create_observer(network_service.as_ref(), &node, client.as_ref(), tx).await }
142    };
143    let worker_fut = async move {
144        create_worker(
145            segment_headers_store,
146            &node,
147            client.as_ref(),
148            import_queue_service.as_mut(),
149            network_block,
150            sync_target_block_number,
151            pause_sync,
152            rx,
153            &piece_getter,
154            &erasure_coding,
155        )
156        .await
157    };
158    (observer_fut, worker_fut)
159}
160
161async fn create_observer<Block, Client>(
162    network_service: &dyn NetworkService,
163    _node: &Node,
164    client: &Client,
165    notifications_sender: mpsc::Sender<NotificationReason>,
166) where
167    Block: BlockT,
168    Client: BlockchainEvents<Block> + Send + Sync + 'static,
169{
170    // // Separate reactive observer for Subspace networking that is not a future
171    // let _handler_id = node.on_num_established_peer_connections_change({
172    //     // Assuming node is offline by default
173    //     let last_online = Atomic::new(None::<Instant>);
174    //     let notifications_sender = notifications_sender.clone();
175    //
176    //     Arc::new(move |&new_connections| {
177    //         let is_online = new_connections > 0;
178    //         let was_online = last_online
179    //             .load(Ordering::AcqRel)
180    //             .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
181    //             .unwrap_or_default();
182    //
183    //         if is_online && !was_online {
184    //             // Doesn't matter if sending failed here
185    //             let _ = notifications_sender
186    //                 .clone()
187    //                 .try_send(NotificationReason::WentOnlineSubspace);
188    //         }
189    //
190    //         if is_online {
191    //             last_online.store(Some(Instant::now()), Ordering::Release);
192    //         }
193    //     })
194    // });
195    select! {
196        _ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
197            // Runs indefinitely
198        }
199        _ = create_substrate_network_observer(network_service, notifications_sender).fuse() => {
200            // Runs indefinitely
201        }
202    }
203}
204
205async fn create_imported_blocks_observer<Block, Client>(
206    client: &Client,
207    mut notifications_sender: mpsc::Sender<NotificationReason>,
208) where
209    Block: BlockT,
210    Client: BlockchainEvents<Block> + Send + Sync + 'static,
211{
212    let mut import_notification_stream = client.every_import_notification_stream();
213    loop {
214        match tokio::time::timeout(
215            NO_IMPORTED_BLOCKS_TIMEOUT,
216            import_notification_stream.next(),
217        )
218        .await
219        {
220            Ok(Some(_notification)) => {
221                // Do nothing
222            }
223            Ok(None) => {
224                // No more notifications
225                return;
226            }
227            Err(_timeout) => {
228                if let Err(error) =
229                    notifications_sender.try_send(NotificationReason::NoImportedBlocks)
230                {
231                    if error.is_disconnected() {
232                        // Receiving side was closed
233                        return;
234                    }
235                }
236            }
237        }
238    }
239}
240
241async fn create_substrate_network_observer(
242    network_service: &dyn NetworkService,
243    mut notifications_sender: mpsc::Sender<NotificationReason>,
244) {
245    // Assuming node is offline by default
246    let mut last_online = None::<Instant>;
247
248    loop {
249        tokio::time::sleep(CHECK_ONLINE_STATUS_INTERVAL).await;
250
251        let is_online = network_service.sync_num_connected() > 0;
252
253        let was_online = last_online
254            .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
255            .unwrap_or_default();
256        if is_online && !was_online {
257            if let Err(error) =
258                notifications_sender.try_send(NotificationReason::WentOnlineSubstrate)
259            {
260                if error.is_disconnected() {
261                    // Receiving side was closed
262                    return;
263                }
264            }
265        }
266
267        if is_online {
268            last_online.replace(Instant::now());
269        }
270    }
271}
272
273#[allow(clippy::too_many_arguments)]
274async fn create_worker<Block, AS, IQS, NB, Client, PG>(
275    segment_headers_store: SegmentHeadersStore<AS>,
276    node: &Node,
277    client: &Client,
278    import_queue_service: &mut IQS,
279    network_block: NB,
280    sync_target_block_number: Arc<AtomicU32>,
281    pause_sync: Arc<AtomicBool>,
282    mut notifications: mpsc::Receiver<NotificationReason>,
283    piece_getter: &PG,
284    erasure_coding: &ErasureCoding,
285) -> Result<(), sc_service::Error>
286where
287    Block: BlockT,
288    AS: AuxStore + Send + Sync + 'static,
289    IQS: ImportQueueService<Block> + ?Sized,
290    NB: NetworkBlock<Block::Hash, NumberFor<Block>>,
291    Client: HeaderBackend<Block>
292        + BlockBackend<Block>
293        + ProvideRuntimeApi<Block>
294        + Send
295        + Sync
296        + 'static,
297    Client::Api: SubspaceApi<Block, PublicKey>,
298    PG: PieceGetter,
299{
300    let info = client.info();
301    let chain_constants = client
302        .runtime_api()
303        .chain_constants(info.best_hash)
304        .map_err(|error| error.to_string())?;
305
306    // Corresponds to contents of block one, everyone has it, so we consider it being processed
307    // right away
308    let mut last_processed_segment_index = SegmentIndex::ZERO;
309    // TODO: We'll be able to just take finalized block once we are able to decouple pruning from
310    //  finality: https://github.com/paritytech/polkadot-sdk/issues/1570
311    let mut last_processed_block_number = info.best_number;
312    let segment_header_downloader = SegmentHeaderDownloader::new(node);
313
314    while let Some(reason) = notifications.next().await {
315        pause_sync.store(true, Ordering::Release);
316
317        info!(target: LOG_TARGET, ?reason, "Received notification to sync from DSN");
318        // TODO: Maybe handle failed block imports, additional helpful logging
319        let import_blocks_from_dsn_fut = import_blocks_from_dsn(
320            &segment_headers_store,
321            &segment_header_downloader,
322            client,
323            piece_getter,
324            import_queue_service,
325            &mut last_processed_segment_index,
326            &mut last_processed_block_number,
327            erasure_coding,
328        );
329        let wait_almost_synced_fut = async {
330            loop {
331                tokio::time::sleep(CHECK_ALMOST_SYNCED_INTERVAL).await;
332
333                let info = client.info();
334                let target_block_number =
335                    NumberFor::<Block>::from(sync_target_block_number.load(Ordering::Relaxed));
336
337                // If less blocks than confirmation depth to the tip of the chain, no need to worry about DSN sync
338                // anymore, it will not be helpful anyway
339                if target_block_number
340                    .checked_sub(&info.best_number)
341                    .map(|diff| diff < chain_constants.confirmation_depth_k().into())
342                    .unwrap_or_default()
343                {
344                    break;
345                }
346            }
347        };
348
349        select! {
350            result = import_blocks_from_dsn_fut.fuse() => {
351                if let Err(error) = result {
352                    warn!(target: LOG_TARGET, %error, "Error when syncing blocks from DSN");
353                }
354            }
355            _ = wait_almost_synced_fut.fuse() => {
356                // Almost synced, DSN sync can't possibly help here
357            }
358        }
359
360        debug!(target: LOG_TARGET, "Finished DSN sync");
361
362        // This will notify Substrate's sync mechanism and allow regular Substrate sync to continue
363        // gracefully
364        {
365            let info = client.info();
366            network_block.new_best_block_imported(info.best_hash, info.best_number);
367        }
368        pause_sync.store(false, Ordering::Release);
369
370        while notifications.try_next().is_ok() {
371            // Just drain extra messages if there are any
372        }
373    }
374
375    Ok(())
376}