1use domain_runtime_primitives::{Balance, BlockNumber};
2use futures::channel::mpsc;
3use futures::{SinkExt, Stream, StreamExt};
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sc_consensus::{
6 BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
7};
8use sc_network::PeerId;
9use sc_network_common::sync::message::{
10 BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
11};
12use sc_network_sync::SyncingService;
13use sc_network_sync::block_relay_protocol::BlockDownloader;
14use sc_network_sync::service::network::NetworkServiceHandle;
15use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
16use sp_blockchain::HeaderBackend;
17use sp_consensus::BlockOrigin;
18use sp_domains::ExecutionReceiptFor;
19use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Header, NumberFor};
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24use tokio::sync::broadcast;
25use tokio::time::sleep;
26use tracing::{Instrument, debug, error, trace};
27
28#[derive(Debug, Clone)]
31pub struct BlockImportingAcknowledgement<Block>
32where
33 Block: BlockT,
34{
35 pub block_number: NumberFor<Block>,
37 pub acknowledgement_sender: mpsc::Sender<()>,
40}
41
42pub struct ConsensusChainSyncParams<Block, DomainHeader>
44where
45 Block: BlockT,
46 DomainHeader: Header,
47{
48 pub snap_sync_orchestrator: Arc<SnapSyncOrchestrator>,
50 pub last_domain_block_er: ExecutionReceiptFor<DomainHeader, Block, Balance>,
52 pub block_importing_notification_stream:
54 Box<dyn Stream<Item = BlockImportingAcknowledgement<Block>> + Sync + Send + Unpin>,
55}
56
57pub struct SnapSyncOrchestrator {
59 consensus_snap_sync_target_block_tx: broadcast::Sender<BlockNumber>,
60 domain_snap_sync_finished: Arc<AtomicBool>,
61}
62
63impl Default for SnapSyncOrchestrator {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl SnapSyncOrchestrator {
70 pub fn new() -> Self {
72 let (tx, _) = broadcast::channel(1);
73 Self {
74 consensus_snap_sync_target_block_tx: tx,
75 domain_snap_sync_finished: Arc::new(AtomicBool::new(false)),
76 }
77 }
78
79 pub fn unblock_consensus_snap_sync(&self, target_block_number: BlockNumber) {
81 debug!(%target_block_number, "Allowed starting consensus chain snap sync.");
82
83 let target_block_send_result = self
84 .consensus_snap_sync_target_block_tx
85 .send(target_block_number);
86
87 debug!(
88 ?target_block_send_result,
89 "Target block sending result: {target_block_number}"
90 );
91 }
92
93 pub fn domain_snap_sync_finished(&self) -> Arc<AtomicBool> {
95 self.domain_snap_sync_finished.clone()
96 }
97
98 pub fn consensus_snap_sync_target_block_receiver(&self) -> broadcast::Receiver<BlockNumber> {
100 self.consensus_snap_sync_target_block_tx.subscribe()
101 }
102
103 pub fn mark_domain_snap_sync_finished(&self) {
105 debug!("Signal that domain snap sync finished.");
106 self.domain_snap_sync_finished
107 .store(true, Ordering::Release);
108 }
109}
110
111pub struct SyncParams<DomainClient, Block, CBlock>
112where
113 Block: BlockT,
114 CBlock: BlockT,
115{
116 pub domain_client: Arc<DomainClient>,
117 pub sync_service: Arc<SyncingService<Block>>,
118 pub domain_fork_id: Option<String>,
119 pub domain_network_service_handle: NetworkServiceHandle,
120 pub domain_block_downloader: Arc<dyn BlockDownloader<Block>>,
121 pub consensus_chain_sync_params: ConsensusChainSyncParams<CBlock, Block::Header>,
122 pub challenge_period: NumberFor<CBlock>,
123}
124
125async fn get_last_confirmed_block<Block: BlockT>(
126 block_downloader: Arc<dyn BlockDownloader<Block>>,
127 sync_service: &SyncingService<Block>,
128 block_number: BlockNumber,
129) -> Result<BlockData<Block>, sp_blockchain::Error> {
130 const LAST_CONFIRMED_BLOCK_RETRIES: u32 = 5;
131 const LOOP_PAUSE: Duration = Duration::from_secs(20);
132 const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
133
134 for attempt in 1..=LAST_CONFIRMED_BLOCK_RETRIES {
135 debug!(%attempt, %block_number, "Starting last confirmed block request...");
136
137 debug!(%block_number, "Gathering peers for last confirmed block request.");
138 let mut tried_peers = HashSet::<PeerId>::new();
139
140 let current_peer_id = match get_currently_connected_peer(
141 sync_service,
142 &mut tried_peers,
143 LOOP_PAUSE,
144 MAX_GET_PEERS_ATTEMPT_NUMBER,
145 )
146 .instrument(tracing::info_span!("last confirmed block"))
147 .await
148 {
149 Ok(peer_id) => peer_id,
150 Err(err) => {
151 debug!(?err, "Getting peers for the last confirmed block failed");
152 continue;
153 }
154 };
155 tried_peers.insert(current_peer_id);
156
157 let id = {
158 let now = SystemTime::now();
159 let duration_since_epoch = now
160 .duration_since(UNIX_EPOCH)
161 .expect("Time usually goes forward");
162
163 duration_since_epoch.as_nanos() as u64
164 };
165
166 let block_request = BlockRequest::<Block> {
167 id,
168 direction: Direction::Ascending,
169 from: FromBlock::Number(block_number.into()),
170 max: Some(1),
171 fields: BlockAttributes::HEADER
172 | BlockAttributes::JUSTIFICATION
173 | BlockAttributes::BODY
174 | BlockAttributes::RECEIPT
175 | BlockAttributes::MESSAGE_QUEUE
176 | BlockAttributes::INDEXED_BODY,
177 };
178 let block_response_result = block_downloader
179 .download_blocks(current_peer_id, block_request.clone())
180 .await;
181
182 match block_response_result {
183 Ok(block_response_inner_result) => {
184 trace!(
185 %block_number,
186 "Sync worker handle result: {:?}",
187 block_response_inner_result.as_ref().map(|(block_data, protocol_name)| (hex::encode(block_data), protocol_name))
188 );
189
190 match block_response_inner_result {
191 Ok(data) => {
192 match block_downloader.block_response_into_blocks(&block_request, data.0) {
193 Ok(mut blocks) => {
194 trace!(%block_number, "Domain block parsing result: {:?}", blocks);
195
196 if let Some(blocks) = blocks.pop() {
197 return Ok(blocks);
198 } else {
199 trace!( %current_peer_id, "Got empty state blocks",);
200 continue;
201 }
202 }
203 Err(error) => {
204 error!(%block_number, ?error, "Domain block parsing error");
205 continue;
206 }
207 }
208 }
209 Err(error) => {
210 error!(%block_number, ?error, "Domain block sync error (inner)");
211 continue;
212 }
213 }
214 }
215 Err(error) => {
216 error!(%block_number, ?error, "Domain block sync error");
217 continue;
218 }
219 }
220 }
221
222 Err(sp_blockchain::Error::Application(
223 format!("Failed to get block {block_number}").into(),
224 ))
225}
226
227fn convert_block_number<Block: BlockT>(block_number: NumberFor<Block>) -> u32 {
228 let block_number: u32 = match block_number.try_into() {
229 Ok(block_number) => block_number,
230 Err(_) => {
231 panic!("Can't convert block number.")
232 }
233 };
234
235 block_number
236}
237
238pub(crate) async fn snap_sync<Block, Client, CBlock>(
239 sync_params: SyncParams<Client, Block, CBlock>,
240) -> Result<(), sp_blockchain::Error>
241where
242 Block: BlockT,
243 Client: HeaderBackend<Block>
244 + BlockImport<Block>
245 + AuxStore
246 + ProofProvider<Block>
247 + BlockchainEvents<Block>
248 + Send
249 + Sync
250 + 'static,
251 for<'a> &'a Client: BlockImport<Block>,
252 CBlock: BlockT,
253{
254 let last_confirmed_block_receipt = sync_params.consensus_chain_sync_params.last_domain_block_er;
255
256 if last_confirmed_block_receipt.domain_block_number == 0u32.into() {
258 return Err(sp_blockchain::Error::Application(
259 "Can't snap sync from genesis.".into(),
260 ));
261 }
262
263 let consensus_block_hash = last_confirmed_block_receipt.consensus_block_hash;
264
265 let mut block_importing_notification_stream = sync_params
266 .consensus_chain_sync_params
267 .block_importing_notification_stream;
268
269 let mut consensus_target_block_acknowledgement_sender = None;
270 while let Some(mut block_notification) = block_importing_notification_stream.next().await {
271 if block_notification.block_number <= last_confirmed_block_receipt.consensus_block_number {
272 if block_notification
273 .acknowledgement_sender
274 .send(())
275 .await
276 .is_err()
277 {
278 return Err(sp_blockchain::Error::Application(
279 format!(
280 "Can't acknowledge block import #{}",
281 block_notification.block_number
282 )
283 .into(),
284 ));
285 };
286 } else {
287 consensus_target_block_acknowledgement_sender
288 .replace(block_notification.acknowledgement_sender);
289 break;
290 }
291 }
292
293 let domain_block_number =
294 convert_block_number::<Block>(last_confirmed_block_receipt.domain_block_number);
295
296 let domain_block_hash = last_confirmed_block_receipt.domain_block_hash;
297 let domain_block = get_last_confirmed_block(
298 sync_params.domain_block_downloader,
299 &sync_params.sync_service,
300 domain_block_number,
301 )
302 .await?;
303
304 let Some(domain_block_header) = domain_block.header else {
305 return Err(sp_blockchain::Error::MissingHeader(
306 "Can't obtain domain block header for snap sync".to_string(),
307 ));
308 };
309
310 let state_result = download_state(
311 &domain_block_header,
312 &sync_params.domain_client,
313 sync_params.domain_fork_id,
314 &sync_params.domain_network_service_handle,
315 &sync_params.sync_service,
316 )
317 .await;
318
319 trace!("State downloaded: {:?}", state_result);
320
321 {
322 let client = sync_params.domain_client.clone();
323 let mut block =
325 BlockImportParams::new(BlockOrigin::NetworkInitialSync, domain_block_header);
326 block.body = domain_block.body;
327 block.justifications = domain_block.justifications;
328 block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state_result?));
329 block.finalized = true;
330 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
331 client.as_ref().import_block(block).await.map_err(|error| {
332 sp_blockchain::Error::Backend(format!("Failed to import state block: {error}"))
333 })?;
334 }
335
336 trace!(
337 "Domain client info after waiting: {:?}",
338 sync_params.domain_client.info()
339 );
340
341 if let Ok(Some(created_domain_block_hash)) =
343 sync_params.domain_client.hash(domain_block_number.into())
344 {
345 if created_domain_block_hash == domain_block_hash {
346 trace!(
347 ?created_domain_block_hash,
348 ?domain_block_hash,
349 "Created hash matches after the domain block import with state",
350 );
351 } else {
352 debug!(
353 ?created_domain_block_hash,
354 ?domain_block_hash,
355 "Created hash doesn't match after the domain block import with state",
356 );
357
358 return Err(sp_blockchain::Error::Backend(
359 "Created hash doesn't match after the domain block import with state".to_string(),
360 ));
361 }
362 } else {
363 return Err(sp_blockchain::Error::Backend(
364 "Can't obtain domain block hash after state importing for snap sync".to_string(),
365 ));
366 }
367
368 crate::aux_schema::track_domain_hash_and_consensus_hash::<_, Block, CBlock>(
369 &sync_params.domain_client,
370 domain_block_hash,
371 consensus_block_hash,
372 false,
375 )?;
376
377 crate::aux_schema::write_execution_receipt::<_, Block, CBlock>(
378 sync_params.domain_client.as_ref(),
379 None,
380 &last_confirmed_block_receipt,
381 sync_params.challenge_period,
382 )?;
383
384 sync_params
385 .consensus_chain_sync_params
386 .snap_sync_orchestrator
387 .mark_domain_snap_sync_finished();
388
389 debug!(info = ?sync_params.domain_client.info(), "Client info after successful domain snap sync.");
390
391 drop(consensus_target_block_acknowledgement_sender);
393 drop(block_importing_notification_stream);
394
395 Ok(())
396}
397
398async fn download_state<Block, Client>(
400 header: &Block::Header,
401 client: &Arc<Client>,
402 fork_id: Option<String>,
403 network_service_handle: &NetworkServiceHandle,
404 sync_service: &SyncingService<Block>,
405) -> Result<ImportedState<Block>, sp_blockchain::Error>
406where
407 Block: BlockT,
408 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
409{
410 let block_number = *header.number();
411
412 const STATE_SYNC_RETRIES: u32 = 20;
413 const LOOP_PAUSE: Duration = Duration::from_secs(20);
414 const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
415
416 for attempt in 1..=STATE_SYNC_RETRIES {
417 debug!(%block_number, %attempt, "Starting state sync...");
418
419 debug!(%block_number, "Gathering peers for state sync.");
420 let mut tried_peers = HashSet::<PeerId>::new();
421
422 let current_peer_id = match get_currently_connected_peer(
423 sync_service,
424 &mut tried_peers,
425 LOOP_PAUSE,
426 MAX_GET_PEERS_ATTEMPT_NUMBER,
427 )
428 .instrument(tracing::info_span!("download state"))
429 .await
430 {
431 Ok(peer_id) => peer_id,
432 Err(err) => {
433 debug!(?err, "Getting peers for state downloading failed");
434 continue;
435 }
436 };
437 tried_peers.insert(current_peer_id);
438
439 let sync_engine = SnapSyncingEngine::<Block>::new(
440 client.clone(),
441 fork_id.as_deref(),
442 header.clone(),
443 false,
444 (current_peer_id, block_number),
445 network_service_handle,
446 )?;
447
448 let last_block_from_sync_result = sync_engine.download_state().await;
449
450 match last_block_from_sync_result {
451 Ok(block_to_import) => {
452 debug!(%block_number, "Sync worker handle result: {:?}", block_to_import);
453
454 return block_to_import.state.ok_or_else(|| {
455 sp_blockchain::Error::Backend(
456 "Imported state was missing in synced block".into(),
457 )
458 });
459 }
460 Err(error) => {
461 error!(%block_number, %error, "State sync error");
462 continue;
463 }
464 }
465 }
466
467 Err(sp_blockchain::Error::Backend(
468 "All snap sync retries failed".into(),
469 ))
470}
471
472async fn get_currently_connected_peer<Block>(
473 sync_service: &SyncingService<Block>,
474 tried_peers: &mut HashSet<PeerId>,
475 loop_pause: Duration,
476 max_attempts: usize,
477) -> Result<PeerId, sp_blockchain::Error>
478where
479 Block: BlockT,
480{
481 for current_attempt in 0..max_attempts {
482 let all_connected_peers = sync_service
483 .peers_info()
484 .await
485 .expect("Network service must be available.");
486
487 debug!(
488 %current_attempt,
489 ?all_connected_peers,
490 "Connected peers"
491 );
492
493 let connected_full_peers = all_connected_peers
494 .iter()
495 .filter_map(|(peer_id, info)| (info.roles.is_full()).then_some(*peer_id))
496 .collect::<Vec<_>>();
497
498 debug!(
499 %current_attempt,
500 ?tried_peers,
501 "Sync peers: {:?}", connected_full_peers
502 );
503
504 let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
505
506 if let Some(peer_id) = active_peers_set.difference(tried_peers).next().cloned() {
507 tried_peers.insert(peer_id);
508 return Ok(peer_id);
509 }
510
511 sleep(loop_pause).await;
512 }
513
514 Err(sp_blockchain::Error::Backend(
515 "All connected peer retries failed".into(),
516 ))
517}