1pub(crate) mod import_blocks;
2pub(crate) mod piece_validator;
3pub(crate) mod segment_header_downloader;
4pub(crate) mod snap_sync;
5
6use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn;
7use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
8use async_trait::async_trait;
9use futures::channel::mpsc;
10use futures::{select, FutureExt, Stream, StreamExt};
11use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents};
12use sc_consensus::import_queue::ImportQueueService;
13use sc_consensus_subspace::archiver::SegmentHeadersStore;
14use sc_network::service::traits::NetworkService;
15use sc_network::NetworkBlock;
16use sp_api::ProvideRuntimeApi;
17use sp_blockchain::HeaderBackend;
18use sp_consensus_subspace::SubspaceApi;
19use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
20use std::fmt;
21use std::future::Future;
22use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use subspace_core_primitives::pieces::{Piece, PieceIndex};
26use subspace_core_primitives::segments::SegmentIndex;
27use subspace_core_primitives::PublicKey;
28use subspace_data_retrieval::piece_getter::PieceGetter;
29use subspace_erasure_coding::ErasureCoding;
30use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
31use subspace_networking::Node;
32use tracing::{debug, info, warn};
33
34const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
36const CHECK_ONLINE_STATUS_INTERVAL: Duration = Duration::from_secs(1);
38const CHECK_ALMOST_SYNCED_INTERVAL: Duration = Duration::from_secs(1);
40const MIN_OFFLINE_PERIOD: Duration = Duration::from_secs(60);
42
43pub(crate) const LOG_TARGET: &str = "consensus_sync";
44
45pub struct DsnPieceGetter<PV: PieceValidator>(PieceProvider<PV>);
47
48impl<PV> fmt::Debug for DsnPieceGetter<PV>
49where
50 PV: PieceValidator,
51{
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_tuple("DsnPieceGetter")
54 .field(&format!("{:?}", self.0))
55 .finish()
56 }
57}
58
59#[async_trait]
60impl<PV> PieceGetter for DsnPieceGetter<PV>
61where
62 PV: PieceValidator,
63{
64 #[inline]
65 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
66 Ok(self.0.get_piece_from_cache(piece_index).await)
67 }
68
69 #[inline]
70 async fn get_pieces<'a>(
71 &'a self,
72 piece_indices: Vec<PieceIndex>,
73 ) -> anyhow::Result<
74 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
75 > {
76 let stream = self
77 .0
78 .get_from_cache(piece_indices)
79 .await
80 .map(|(piece_index, maybe_piece)| (piece_index, Ok(maybe_piece)));
81 Ok(Box::new(stream))
82 }
83}
84
85impl<PV> DsnPieceGetter<PV>
86where
87 PV: PieceValidator,
88{
89 pub fn new(piece_provider: PieceProvider<PV>) -> Self {
91 Self(piece_provider)
92 }
93}
94
95#[derive(Debug)]
96enum NotificationReason {
97 NoImportedBlocks,
98 #[allow(dead_code)]
100 WentOnlineSubspace,
101 WentOnlineSubstrate,
102}
103
104#[allow(clippy::too_many_arguments)]
107pub(super) fn create_observer_and_worker<Block, AS, NB, Client, PG>(
108 segment_headers_store: SegmentHeadersStore<AS>,
109 network_service: Arc<dyn NetworkService>,
110 node: Node,
111 client: Arc<Client>,
112 mut import_queue_service: Box<dyn ImportQueueService<Block>>,
113 network_block: NB,
114 sync_target_block_number: Arc<AtomicU32>,
115 pause_sync: Arc<AtomicBool>,
116 piece_getter: PG,
117 erasure_coding: ErasureCoding,
118) -> (
119 impl Future<Output = ()> + Send + 'static,
120 impl Future<Output = Result<(), sc_service::Error>> + Send + 'static,
121)
122where
123 Block: BlockT,
124 AS: AuxStore + Send + Sync + 'static,
125 NB: NetworkBlock<Block::Hash, NumberFor<Block>> + Send + 'static,
126 Client: HeaderBackend<Block>
127 + BlockBackend<Block>
128 + BlockchainEvents<Block>
129 + ProvideRuntimeApi<Block>
130 + Send
131 + Sync
132 + 'static,
133 Client::Api: SubspaceApi<Block, PublicKey>,
134 PG: PieceGetter + Send + Sync + 'static,
135{
136 let (tx, rx) = mpsc::channel(0);
137 let observer_fut = {
138 let node = node.clone();
139 let client = Arc::clone(&client);
140
141 async move { create_observer(network_service.as_ref(), &node, client.as_ref(), tx).await }
142 };
143 let worker_fut = async move {
144 create_worker(
145 segment_headers_store,
146 &node,
147 client.as_ref(),
148 import_queue_service.as_mut(),
149 network_block,
150 sync_target_block_number,
151 pause_sync,
152 rx,
153 &piece_getter,
154 &erasure_coding,
155 )
156 .await
157 };
158 (observer_fut, worker_fut)
159}
160
161async fn create_observer<Block, Client>(
162 network_service: &dyn NetworkService,
163 _node: &Node,
164 client: &Client,
165 notifications_sender: mpsc::Sender<NotificationReason>,
166) where
167 Block: BlockT,
168 Client: BlockchainEvents<Block> + Send + Sync + 'static,
169{
170 select! {
196 _ = create_imported_blocks_observer(client, notifications_sender.clone()).fuse() => {
197 }
199 _ = create_substrate_network_observer(network_service, notifications_sender).fuse() => {
200 }
202 }
203}
204
205async fn create_imported_blocks_observer<Block, Client>(
206 client: &Client,
207 mut notifications_sender: mpsc::Sender<NotificationReason>,
208) where
209 Block: BlockT,
210 Client: BlockchainEvents<Block> + Send + Sync + 'static,
211{
212 let mut import_notification_stream = client.every_import_notification_stream();
213 loop {
214 match tokio::time::timeout(
215 NO_IMPORTED_BLOCKS_TIMEOUT,
216 import_notification_stream.next(),
217 )
218 .await
219 {
220 Ok(Some(_notification)) => {
221 }
223 Ok(None) => {
224 return;
226 }
227 Err(_timeout) => {
228 if let Err(error) =
229 notifications_sender.try_send(NotificationReason::NoImportedBlocks)
230 {
231 if error.is_disconnected() {
232 return;
234 }
235 }
236 }
237 }
238 }
239}
240
241async fn create_substrate_network_observer(
242 network_service: &dyn NetworkService,
243 mut notifications_sender: mpsc::Sender<NotificationReason>,
244) {
245 let mut last_online = None::<Instant>;
247
248 loop {
249 tokio::time::sleep(CHECK_ONLINE_STATUS_INTERVAL).await;
250
251 let is_online = network_service.sync_num_connected() > 0;
252
253 let was_online = last_online
254 .map(|last_online| last_online.elapsed() < MIN_OFFLINE_PERIOD)
255 .unwrap_or_default();
256 if is_online && !was_online {
257 if let Err(error) =
258 notifications_sender.try_send(NotificationReason::WentOnlineSubstrate)
259 {
260 if error.is_disconnected() {
261 return;
263 }
264 }
265 }
266
267 if is_online {
268 last_online.replace(Instant::now());
269 }
270 }
271}
272
273#[allow(clippy::too_many_arguments)]
274async fn create_worker<Block, AS, IQS, NB, Client, PG>(
275 segment_headers_store: SegmentHeadersStore<AS>,
276 node: &Node,
277 client: &Client,
278 import_queue_service: &mut IQS,
279 network_block: NB,
280 sync_target_block_number: Arc<AtomicU32>,
281 pause_sync: Arc<AtomicBool>,
282 mut notifications: mpsc::Receiver<NotificationReason>,
283 piece_getter: &PG,
284 erasure_coding: &ErasureCoding,
285) -> Result<(), sc_service::Error>
286where
287 Block: BlockT,
288 AS: AuxStore + Send + Sync + 'static,
289 IQS: ImportQueueService<Block> + ?Sized,
290 NB: NetworkBlock<Block::Hash, NumberFor<Block>>,
291 Client: HeaderBackend<Block>
292 + BlockBackend<Block>
293 + ProvideRuntimeApi<Block>
294 + Send
295 + Sync
296 + 'static,
297 Client::Api: SubspaceApi<Block, PublicKey>,
298 PG: PieceGetter,
299{
300 let info = client.info();
301 let chain_constants = client
302 .runtime_api()
303 .chain_constants(info.best_hash)
304 .map_err(|error| error.to_string())?;
305
306 let mut last_processed_segment_index = SegmentIndex::ZERO;
309 let mut last_processed_block_number = info.best_number;
312 let segment_header_downloader = SegmentHeaderDownloader::new(node);
313
314 while let Some(reason) = notifications.next().await {
315 pause_sync.store(true, Ordering::Release);
316
317 info!(target: LOG_TARGET, ?reason, "Received notification to sync from DSN");
318 let import_blocks_from_dsn_fut = import_blocks_from_dsn(
320 &segment_headers_store,
321 &segment_header_downloader,
322 client,
323 piece_getter,
324 import_queue_service,
325 &mut last_processed_segment_index,
326 &mut last_processed_block_number,
327 erasure_coding,
328 );
329 let wait_almost_synced_fut = async {
330 loop {
331 tokio::time::sleep(CHECK_ALMOST_SYNCED_INTERVAL).await;
332
333 let info = client.info();
334 let target_block_number =
335 NumberFor::<Block>::from(sync_target_block_number.load(Ordering::Relaxed));
336
337 if target_block_number
340 .checked_sub(&info.best_number)
341 .map(|diff| diff < chain_constants.confirmation_depth_k().into())
342 .unwrap_or_default()
343 {
344 break;
345 }
346 }
347 };
348
349 select! {
350 result = import_blocks_from_dsn_fut.fuse() => {
351 if let Err(error) = result {
352 warn!(target: LOG_TARGET, %error, "Error when syncing blocks from DSN");
353 }
354 }
355 _ = wait_almost_synced_fut.fuse() => {
356 }
358 }
359
360 debug!(target: LOG_TARGET, "Finished DSN sync");
361
362 {
365 let info = client.info();
366 network_block.new_best_block_imported(info.best_hash, info.best_number);
367 }
368 pause_sync.store(false, Ordering::Release);
369
370 while notifications.try_next().is_ok() {
371 }
373 }
374
375 Ok(())
376}