1use domain_runtime_primitives::{Balance, BlockNumber};
2use futures::channel::mpsc;
3use futures::{SinkExt, Stream, StreamExt};
4use sc_client_api::{AuxStore, BlockchainEvents, ProofProvider};
5use sc_consensus::{
6 BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
7};
8use sc_network::PeerId;
9use sc_network_common::sync::message::{
10 BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
11};
12use sc_network_sync::block_relay_protocol::BlockDownloader;
13use sc_network_sync::service::network::NetworkServiceHandle;
14use sc_network_sync::SyncingService;
15use sc_subspace_sync_common::snap_sync_engine::SnapSyncingEngine;
16use sp_blockchain::HeaderBackend;
17use sp_consensus::BlockOrigin;
18use sp_domains::ExecutionReceiptFor;
19use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Header, NumberFor};
20use std::collections::HashSet;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24use tokio::sync::broadcast;
25use tokio::time::sleep;
26use tracing::{debug, error, trace, Instrument};
27
28pub(crate) const LOG_TARGET: &str = "domain_snap_sync";
29
30#[derive(Debug, Clone)]
33pub struct BlockImportingAcknowledgement<Block>
34where
35 Block: BlockT,
36{
37 pub block_number: NumberFor<Block>,
39 pub acknowledgement_sender: mpsc::Sender<()>,
42}
43
44pub struct ConsensusChainSyncParams<Block, DomainHeader>
46where
47 Block: BlockT,
48 DomainHeader: Header,
49{
50 pub snap_sync_orchestrator: Arc<SnapSyncOrchestrator>,
52 pub last_domain_block_er: ExecutionReceiptFor<DomainHeader, Block, Balance>,
54 pub block_importing_notification_stream:
56 Box<dyn Stream<Item = BlockImportingAcknowledgement<Block>> + Sync + Send + Unpin>,
57}
58
59pub struct SnapSyncOrchestrator {
61 consensus_snap_sync_target_block_tx: broadcast::Sender<BlockNumber>,
62 domain_snap_sync_finished: Arc<AtomicBool>,
63}
64
65impl Default for SnapSyncOrchestrator {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl SnapSyncOrchestrator {
72 pub fn new() -> Self {
74 let (tx, _) = broadcast::channel(1);
75 Self {
76 consensus_snap_sync_target_block_tx: tx,
77 domain_snap_sync_finished: Arc::new(AtomicBool::new(false)),
78 }
79 }
80
81 pub fn unblock_consensus_snap_sync(&self, target_block_number: BlockNumber) {
83 debug!(target: LOG_TARGET, %target_block_number, "Allowed starting consensus chain snap sync.");
84
85 let target_block_send_result = self
86 .consensus_snap_sync_target_block_tx
87 .send(target_block_number);
88
89 debug!(
90 target: LOG_TARGET,
91 ?target_block_send_result,
92 "Target block sending result: {target_block_number}"
93 );
94 }
95
96 pub fn domain_snap_sync_finished(&self) -> Arc<AtomicBool> {
98 self.domain_snap_sync_finished.clone()
99 }
100
101 pub fn consensus_snap_sync_target_block_receiver(&self) -> broadcast::Receiver<BlockNumber> {
103 self.consensus_snap_sync_target_block_tx.subscribe()
104 }
105
106 pub fn mark_domain_snap_sync_finished(&self) {
108 debug!(target: LOG_TARGET, "Signal that domain snap sync finished.");
109 self.domain_snap_sync_finished
110 .store(true, Ordering::Release);
111 }
112}
113
114pub struct SyncParams<DomainClient, Block, CBlock>
115where
116 Block: BlockT,
117 CBlock: BlockT,
118{
119 pub domain_client: Arc<DomainClient>,
120 pub sync_service: Arc<SyncingService<Block>>,
121 pub domain_fork_id: Option<String>,
122 pub domain_network_service_handle: NetworkServiceHandle,
123 pub domain_block_downloader: Arc<dyn BlockDownloader<Block>>,
124 pub consensus_chain_sync_params: ConsensusChainSyncParams<CBlock, Block::Header>,
125 pub challenge_period: NumberFor<CBlock>,
126}
127
128async fn get_last_confirmed_block<Block: BlockT>(
129 block_downloader: Arc<dyn BlockDownloader<Block>>,
130 sync_service: &SyncingService<Block>,
131 block_number: BlockNumber,
132) -> Result<BlockData<Block>, sp_blockchain::Error> {
133 const LAST_CONFIRMED_BLOCK_RETRIES: u32 = 5;
134 const LOOP_PAUSE: Duration = Duration::from_secs(20);
135 const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
136
137 for attempt in 1..=LAST_CONFIRMED_BLOCK_RETRIES {
138 debug!(target: LOG_TARGET, %attempt, %block_number, "Starting last confirmed block request...");
139
140 debug!(target: LOG_TARGET, %block_number, "Gathering peers for last confirmed block request.");
141 let mut tried_peers = HashSet::<PeerId>::new();
142
143 let current_peer_id = match get_currently_connected_peer(
144 sync_service,
145 &mut tried_peers,
146 LOOP_PAUSE,
147 MAX_GET_PEERS_ATTEMPT_NUMBER,
148 )
149 .instrument(tracing::info_span!("last confirmed block"))
150 .await
151 {
152 Ok(peer_id) => peer_id,
153 Err(err) => {
154 debug!(target: LOG_TARGET, ?err, "Getting peers for the last confirmed block failed");
155 continue;
156 }
157 };
158 tried_peers.insert(current_peer_id);
159
160 let id = {
161 let now = SystemTime::now();
162 let duration_since_epoch = now
163 .duration_since(UNIX_EPOCH)
164 .expect("Time usually goes forward");
165
166 duration_since_epoch.as_nanos() as u64
167 };
168
169 let block_request = BlockRequest::<Block> {
170 id,
171 direction: Direction::Ascending,
172 from: FromBlock::Number(block_number.into()),
173 max: Some(1),
174 fields: BlockAttributes::HEADER
175 | BlockAttributes::JUSTIFICATION
176 | BlockAttributes::BODY
177 | BlockAttributes::RECEIPT
178 | BlockAttributes::MESSAGE_QUEUE
179 | BlockAttributes::INDEXED_BODY,
180 };
181 let block_response_result = block_downloader
182 .download_blocks(current_peer_id, block_request.clone())
183 .await;
184
185 match block_response_result {
186 Ok(block_response_inner_result) => {
187 trace!(
188 target: LOG_TARGET,
189 %block_number,
190 "Sync worker handle result: {:?}",
191 block_response_inner_result.as_ref().map(|(block_data, protocol_name)| (hex::encode(block_data), protocol_name))
192 );
193
194 match block_response_inner_result {
195 Ok(data) => {
196 match block_downloader.block_response_into_blocks(&block_request, data.0) {
197 Ok(mut blocks) => {
198 trace!(target: LOG_TARGET, %block_number, "Domain block parsing result: {:?}", blocks);
199
200 if let Some(blocks) = blocks.pop() {
201 return Ok(blocks);
202 } else {
203 trace!(target: LOG_TARGET, %current_peer_id, "Got empty state blocks",);
204 continue;
205 }
206 }
207 Err(error) => {
208 error!(target: LOG_TARGET, %block_number, ?error, "Domain block parsing error");
209 continue;
210 }
211 }
212 }
213 Err(error) => {
214 error!(target: LOG_TARGET, %block_number, ?error, "Domain block sync error (inner)");
215 continue;
216 }
217 }
218 }
219 Err(error) => {
220 error!(target: LOG_TARGET, %block_number, ?error, "Domain block sync error");
221 continue;
222 }
223 }
224 }
225
226 Err(sp_blockchain::Error::Application(
227 format!("Failed to get block {}", block_number).into(),
228 ))
229}
230
231fn convert_block_number<Block: BlockT>(block_number: NumberFor<Block>) -> u32 {
232 let block_number: u32 = match block_number.try_into() {
233 Ok(block_number) => block_number,
234 Err(_) => {
235 panic!("Can't convert block number.")
236 }
237 };
238
239 block_number
240}
241
242pub(crate) async fn snap_sync<Block, Client, CBlock>(
243 sync_params: SyncParams<Client, Block, CBlock>,
244) -> Result<(), sp_blockchain::Error>
245where
246 Block: BlockT,
247 Client: HeaderBackend<Block>
248 + BlockImport<Block>
249 + AuxStore
250 + ProofProvider<Block>
251 + BlockchainEvents<Block>
252 + Send
253 + Sync
254 + 'static,
255 for<'a> &'a Client: BlockImport<Block>,
256 CBlock: BlockT,
257{
258 let last_confirmed_block_receipt = sync_params.consensus_chain_sync_params.last_domain_block_er;
259
260 if last_confirmed_block_receipt.domain_block_number == 0u32.into() {
262 return Err(sp_blockchain::Error::Application(
263 "Can't snap sync from genesis.".into(),
264 ));
265 }
266
267 let consensus_block_hash = last_confirmed_block_receipt.consensus_block_hash;
268
269 let mut block_importing_notification_stream = sync_params
270 .consensus_chain_sync_params
271 .block_importing_notification_stream;
272
273 let mut consensus_target_block_acknowledgement_sender = None;
274 while let Some(mut block_notification) = block_importing_notification_stream.next().await {
275 if block_notification.block_number <= last_confirmed_block_receipt.consensus_block_number {
276 if block_notification
277 .acknowledgement_sender
278 .send(())
279 .await
280 .is_err()
281 {
282 return Err(sp_blockchain::Error::Application(
283 format!(
284 "Can't acknowledge block import #{}",
285 block_notification.block_number
286 )
287 .into(),
288 ));
289 };
290 } else {
291 consensus_target_block_acknowledgement_sender
292 .replace(block_notification.acknowledgement_sender);
293 break;
294 }
295 }
296
297 let domain_block_number =
298 convert_block_number::<Block>(last_confirmed_block_receipt.domain_block_number);
299
300 let domain_block_hash = last_confirmed_block_receipt.domain_block_hash;
301 let domain_block = get_last_confirmed_block(
302 sync_params.domain_block_downloader,
303 &sync_params.sync_service,
304 domain_block_number,
305 )
306 .await?;
307
308 let Some(domain_block_header) = domain_block.header else {
309 return Err(sp_blockchain::Error::MissingHeader(
310 "Can't obtain domain block header for snap sync".to_string(),
311 ));
312 };
313
314 let state_result = download_state(
315 &domain_block_header,
316 &sync_params.domain_client,
317 sync_params.domain_fork_id,
318 &sync_params.domain_network_service_handle,
319 &sync_params.sync_service,
320 )
321 .await;
322
323 trace!(target: LOG_TARGET, "State downloaded: {:?}", state_result);
324
325 {
326 let client = sync_params.domain_client.clone();
327 let mut block =
329 BlockImportParams::new(BlockOrigin::NetworkInitialSync, domain_block_header);
330 block.body = domain_block.body;
331 block.justifications = domain_block.justifications;
332 block.state_action = StateAction::ApplyChanges(StorageChanges::Import(state_result?));
333 block.finalized = true;
334 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
335 client.as_ref().import_block(block).await.map_err(|error| {
336 sp_blockchain::Error::Backend(format!("Failed to import state block: {error}"))
337 })?;
338 }
339
340 trace!(
341 target: LOG_TARGET,
342 "Domain client info after waiting: {:?}",
343 sync_params.domain_client.info()
344 );
345
346 if let Ok(Some(created_domain_block_hash)) =
348 sync_params.domain_client.hash(domain_block_number.into())
349 {
350 if created_domain_block_hash == domain_block_hash {
351 trace!(
352 target: LOG_TARGET,
353 ?created_domain_block_hash,
354 ?domain_block_hash,
355 "Created hash matches after the domain block import with state",
356 );
357 } else {
358 debug!(
359 target: LOG_TARGET,
360 ?created_domain_block_hash,
361 ?domain_block_hash,
362 "Created hash doesn't match after the domain block import with state",
363 );
364
365 return Err(sp_blockchain::Error::Backend(
366 "Created hash doesn't match after the domain block import with state".to_string(),
367 ));
368 }
369 } else {
370 return Err(sp_blockchain::Error::Backend(
371 "Can't obtain domain block hash after state importing for snap sync".to_string(),
372 ));
373 }
374
375 crate::aux_schema::track_domain_hash_and_consensus_hash::<_, Block, CBlock>(
376 &sync_params.domain_client,
377 domain_block_hash,
378 consensus_block_hash,
379 false,
382 )?;
383
384 crate::aux_schema::write_execution_receipt::<_, Block, CBlock>(
385 sync_params.domain_client.as_ref(),
386 None,
387 &last_confirmed_block_receipt,
388 sync_params.challenge_period,
389 )?;
390
391 sync_params
392 .consensus_chain_sync_params
393 .snap_sync_orchestrator
394 .mark_domain_snap_sync_finished();
395
396 debug!(target: LOG_TARGET, info = ?sync_params.domain_client.info(), "Client info after successful domain snap sync.");
397
398 drop(consensus_target_block_acknowledgement_sender);
400 drop(block_importing_notification_stream);
401
402 Ok(())
403}
404
405async fn download_state<Block, Client>(
407 header: &Block::Header,
408 client: &Arc<Client>,
409 fork_id: Option<String>,
410 network_service_handle: &NetworkServiceHandle,
411 sync_service: &SyncingService<Block>,
412) -> Result<ImportedState<Block>, sp_blockchain::Error>
413where
414 Block: BlockT,
415 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
416{
417 let block_number = *header.number();
418
419 const STATE_SYNC_RETRIES: u32 = 5;
420 const LOOP_PAUSE: Duration = Duration::from_secs(20);
421 const MAX_GET_PEERS_ATTEMPT_NUMBER: usize = 30;
422
423 for attempt in 1..=STATE_SYNC_RETRIES {
424 debug!(target: LOG_TARGET, %block_number, %attempt, "Starting state sync...");
425
426 debug!(target: LOG_TARGET, %block_number, "Gathering peers for state sync.");
427 let mut tried_peers = HashSet::<PeerId>::new();
428
429 let current_peer_id = match get_currently_connected_peer(
430 sync_service,
431 &mut tried_peers,
432 LOOP_PAUSE,
433 MAX_GET_PEERS_ATTEMPT_NUMBER,
434 )
435 .instrument(tracing::info_span!("download state"))
436 .await
437 {
438 Ok(peer_id) => peer_id,
439 Err(err) => {
440 debug!(?err, "Getting peers for state downloading failed");
441 continue;
442 }
443 };
444 tried_peers.insert(current_peer_id);
445
446 let sync_engine = SnapSyncingEngine::<Block>::new(
447 client.clone(),
448 fork_id.as_deref(),
449 header.clone(),
450 false,
451 (current_peer_id, block_number),
452 network_service_handle,
453 )?;
454
455 let last_block_from_sync_result = sync_engine.download_state().await;
456
457 match last_block_from_sync_result {
458 Ok(block_to_import) => {
459 debug!(target: LOG_TARGET, %block_number, "Sync worker handle result: {:?}", block_to_import);
460
461 return block_to_import.state.ok_or_else(|| {
462 sp_blockchain::Error::Backend(
463 "Imported state was missing in synced block".into(),
464 )
465 });
466 }
467 Err(error) => {
468 error!(target: LOG_TARGET, %block_number, %error, "State sync error");
469 continue;
470 }
471 }
472 }
473
474 Err(sp_blockchain::Error::Backend(
475 "All snap sync retries failed".into(),
476 ))
477}
478
479async fn get_currently_connected_peer<Block>(
480 sync_service: &SyncingService<Block>,
481 tried_peers: &mut HashSet<PeerId>,
482 loop_pause: Duration,
483 max_attempts: usize,
484) -> Result<PeerId, sp_blockchain::Error>
485where
486 Block: BlockT,
487{
488 for current_attempt in 0..max_attempts {
489 let all_connected_peers = sync_service
490 .peers_info()
491 .await
492 .expect("Network service must be available.");
493
494 debug!(
495 target: LOG_TARGET,
496 %current_attempt,
497 ?all_connected_peers,
498 "Connected peers"
499 );
500
501 let connected_full_peers = all_connected_peers
502 .iter()
503 .filter_map(|(peer_id, info)| (info.roles.is_full()).then_some(*peer_id))
504 .collect::<Vec<_>>();
505
506 debug!(
507 target: LOG_TARGET,
508 %current_attempt,
509 ?tried_peers,
510 "Sync peers: {:?}", connected_full_peers
511 );
512
513 let active_peers_set = HashSet::from_iter(connected_full_peers.into_iter());
514
515 if let Some(peer_id) = active_peers_set.difference(tried_peers).next().cloned() {
516 tried_peers.insert(peer_id);
517 return Ok(peer_id);
518 }
519
520 sleep(loop_pause).await;
521 }
522
523 Err(sp_blockchain::Error::Backend(
524 "All connected peer retries failed".into(),
525 ))
526}