1pub(crate) mod import_blocks;
2pub(crate) mod piece_validator;
3pub(crate) mod segment_header_downloader;
4pub(crate) mod snap_sync;
5
6use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn;
7use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
8use async_trait::async_trait;
9use futures::channel::mpsc;
10use futures::{FutureExt, Stream, StreamExt, select};
11use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents};
12use sc_consensus::import_queue::ImportQueueService;
13use sc_consensus_subspace::archiver::SegmentHeadersStore;
14use sc_network::NetworkBlock;
15use sc_network::service::traits::NetworkService;
16use sp_api::ProvideRuntimeApi;
17use sp_blockchain::HeaderBackend;
18use sp_consensus_subspace::SubspaceApi;
19use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
20use std::fmt;
21use std::future::Future;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
24use std::time::{Duration, Instant};
25use subspace_core_primitives::PublicKey;
26use subspace_core_primitives::pieces::{Piece, PieceIndex};
27use subspace_core_primitives::segments::SegmentIndex;
28use subspace_data_retrieval::piece_getter::PieceGetter;
29use subspace_erasure_coding::ErasureCoding;
30use subspace_networking::Node;
31use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
32use tracing::{debug, info, warn};
33
34const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
36const CHECK_ONLINE_STATUS_INTERVAL: Duration = Duration::from_secs(1);
38const CHECK_ALMOST_SYNCED_INTERVAL: Duration = Duration::from_secs(1);
40const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
42
43pub struct DsnPieceGetter<PV: PieceValidator>(PieceProvider<PV>);
45
46impl<PV> fmt::Debug for DsnPieceGetter<PV>
47where
48 PV: PieceValidator,
49{
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_tuple("DsnPieceGetter")
52 .field(&format!("{:?}", self.0))
53 .finish()
54 }
55}
56
57#[async_trait]
58impl<PV> PieceGetter for DsnPieceGetter<PV>
59where
60 PV: PieceValidator,
61{
62 #[inline]
63 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
64 Ok(self.0.get_piece_from_cache(piece_index).await)
65 }
66
67 #[inline]
68 async fn get_pieces<'a>(
69 &'a self,
70 piece_indices: Vec<PieceIndex>,
71 ) -> anyhow::Result<
72 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
73 > {
74 let stream = self
75 .0
76 .get_from_cache(piece_indices)
77 .await
78 .map(|(piece_index, maybe_piece)| (piece_index, Ok(maybe_piece)));
79 Ok(Box::new(stream))
80 }
81}
82
83impl<PV> DsnPieceGetter<PV>
84where
85 PV: PieceValidator,
86{
87 pub fn new(piece_provider: PieceProvider<PV>) -> Self {
89 Self(piece_provider)
90 }
91}
92
93#[derive(Debug)]
94enum NotificationReason {
95 NoImportedBlocks,
96 #[allow(dead_code)]
98 WentOnlineSubspace,
99 WentOnlineSubstrate,
100}
101
102#[allow(clippy::too_many_arguments)]
105pub(super) fn create_observer_and_worker<Block, AS, NB, Client, PG>(
106 segment_headers_store: SegmentHeadersStore<AS>,
107 network_service: Arc<dyn NetworkService>,
108 node: Node,
109 client: Arc<Client>,
110 mut import_queue_service: Box<dyn ImportQueueService<Block>>,
111 network_block: NB,
112 sync_target_block_number: Arc<AtomicU32>,
113 pause_sync: Arc<AtomicBool>,
114 piece_getter: PG,
115 erasure_coding: ErasureCoding,
116) -> (
117 impl Future<Output = ()> + Send + 'static,
118 impl Future<Output = Result<(), sc_service::Error>> + Send + 'static,
119)
120where
121 Block: BlockT,
122 AS: AuxStore + Send + Sync + 'static,
123 NB: NetworkBlock<Block::Hash, NumberFor<Block>> + Send + 'static,
124 Client: HeaderBackend<Block>
125 + BlockBackend<Block>
126 + BlockchainEvents<Block>
127 + ProvideRuntimeApi<Block>
128 + Send
129 + Sync
130 + 'static,
131 Client::Api: SubspaceApi<Block, PublicKey>,
132 PG: PieceGetter + Send + Sync + 'static,
133{
134 let (tx, rx) = mpsc::channel(0);
135 let observer_fut = {
136 let node = node.clone();
137 let client = Arc::clone(&client);
138
139 async move { create_observer(network_service.as_ref(), &node, client.as_ref(), tx).await }
140 };
141 let worker_fut = async move {
142 create_worker(
143 segment_headers_store,
144 &node,
145 client.as_ref(),
146 import_queue_service.as_mut(),
147 network_block,
148 sync_target_block_number,
149 pause_sync,
150 rx,
151 &piece_getter,
152 &erasure_coding,
153 )
154 .await
155 };
156 (observer_fut, worker_fut)
157}
158
159async fn create_observer<Block, Client>(
160 network_service: &dyn NetworkService,
161 _node: &Node,
162 client: &Client,
163 notifications_sender: mpsc::Sender<NotificationReason>,
164) where
165 Block: BlockT,
166 Client: BlockchainEvents<Block> + Send + Sync + 'static,
167{
168 select! {
194 _ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
195 }
197 _ = create_substrate_network_observer(network_service, notifications_sender).fuse() => {
198 }
200 }
201}
202
203async fn create_imported_blocks_observer<Block, Client>(
204 client: &Client,
205 mut notifications_sender: mpsc::Sender<NotificationReason>,
206) where
207 Block: BlockT,
208 Client: BlockchainEvents<Block> + Send + Sync + 'static,
209{
210 let mut import_notification_stream = client.every_import_notification_stream();
211 loop {
212 match tokio::time::timeout(
213 NO_IMPORTED_BLOCKS_TIMEOUT,
214 import_notification_stream.next(),
215 )
216 .await
217 {
218 Ok(Some(_notification)) => {
219 }
221 Ok(None) => {
222 return;
224 }
225 Err(_timeout) => {
226 if let Err(error) =
227 notifications_sender.try_send(NotificationReason::NoImportedBlocks)
228 && error.is_disconnected()
229 {
230 return;
232 }
233 }
234 }
235 }
236}
237
238async fn create_substrate_network_observer(
239 network_service: &dyn NetworkService,
240 mut notifications_sender: mpsc::Sender<NotificationReason>,
241) {
242 let mut last_online = None::<Instant>;
244
245 loop {
246 tokio::time::sleep(CHECK_ONLINE_STATUS_INTERVAL).await;
247
248 let is_online = network_service.sync_num_connected() > 0;
249
250 let was_online = last_online
251 .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
252 .unwrap_or_default();
253 if is_online
254 && !was_online
255 && let Err(error) =
256 notifications_sender.try_send(NotificationReason::WentOnlineSubstrate)
257 && error.is_disconnected()
258 {
259 return;
261 }
262
263 if is_online {
264 last_online.replace(Instant::now());
265 }
266 }
267}
268
269#[allow(clippy::too_many_arguments)]
270async fn create_worker<Block, AS, IQS, NB, Client, PG>(
271 segment_headers_store: SegmentHeadersStore<AS>,
272 node: &Node,
273 client: &Client,
274 import_queue_service: &mut IQS,
275 network_block: NB,
276 sync_target_block_number: Arc<AtomicU32>,
277 pause_sync: Arc<AtomicBool>,
278 mut notifications: mpsc::Receiver<NotificationReason>,
279 piece_getter: &PG,
280 erasure_coding: &ErasureCoding,
281) -> Result<(), sc_service::Error>
282where
283 Block: BlockT,
284 AS: AuxStore + Send + Sync + 'static,
285 IQS: ImportQueueService<Block> + ?Sized,
286 NB: NetworkBlock<Block::Hash, NumberFor<Block>>,
287 Client: HeaderBackend<Block>
288 + BlockBackend<Block>
289 + ProvideRuntimeApi<Block>
290 + Send
291 + Sync
292 + 'static,
293 Client::Api: SubspaceApi<Block, PublicKey>,
294 PG: PieceGetter,
295{
296 let info = client.info();
297 let chain_constants = client
298 .runtime_api()
299 .chain_constants(info.best_hash)
300 .map_err(|error| error.to_string())?;
301
302 let mut last_completed_segment_index = SegmentIndex::ZERO;
309
310 let mut last_processed_block_number = info.best_number;
316 let segment_header_downloader = SegmentHeaderDownloader::new(node);
317
318 while let Some(reason) = notifications.next().await {
319 info!(
320 ?reason,
321 ?last_completed_segment_index,
322 ?last_processed_block_number,
323 "Received notification to sync from DSN, deactivating substrate sync"
324 );
325 pause_sync.store(true, Ordering::Release);
326
327 let import_blocks_from_dsn_fut = import_blocks_from_dsn(
329 &segment_headers_store,
330 &segment_header_downloader,
331 client,
332 piece_getter,
333 import_queue_service,
334 &mut last_completed_segment_index,
335 &mut last_processed_block_number,
336 erasure_coding,
337 );
338 let wait_almost_synced_fut = async {
339 loop {
340 tokio::time::sleep(CHECK_ALMOST_SYNCED_INTERVAL).await;
341
342 let info = client.info();
343 let target_block_number =
344 NumberFor::<Block>::from(sync_target_block_number.load(Ordering::Relaxed));
345
346 if target_block_number
349 .checked_sub(&info.best_number)
350 .map(|diff| diff < chain_constants.confirmation_depth_k().into())
351 .unwrap_or_default()
352 {
353 debug!(
354 best_block = ?info.best_number,
355 ?target_block_number,
356 "Node is almost synced, stopping DSN sync until the next notification"
357 );
358 break;
359 }
360 }
361 };
362
363 select! {
364 result = import_blocks_from_dsn_fut.fuse() => {
365 if let Err(error) = result {
366 warn!(
367 %error,
368 ?last_completed_segment_index,
369 ?last_processed_block_number,
370 "Error when syncing blocks from DSN, stopping DSN sync until the next notification"
371 );
372 }
373 }
374 _ = wait_almost_synced_fut.fuse() => {
375 }
377 }
378
379 while notifications.try_next().is_ok() {
380 }
382
383 debug!(
387 ?last_completed_segment_index,
388 ?last_processed_block_number,
389 "Finished DSN sync, activating substrate sync"
390 );
391 {
392 let info = client.info();
393 network_block.new_best_block_imported(info.best_hash, info.best_number);
394 }
395 pause_sync.store(false, Ordering::Release);
396 }
397
398 Ok(())
399}