sc_subspace_sync_common/
snap_sync_engine.rs1use futures::StreamExt;
5use futures::channel::oneshot;
6use sc_client_api::ProofProvider;
7use sc_consensus::IncomingBlock;
8use sc_network::types::ProtocolName;
9use sc_network::{OutboundFailure, PeerId, RequestFailure};
10use sc_network_sync::pending_responses::{PendingResponses, ResponseEvent};
11use sc_network_sync::service::network::NetworkServiceHandle;
12use sc_network_sync::state_request_handler::generate_protocol_name;
13use sc_network_sync::strategy::SyncingAction;
14use sc_network_sync::strategy::state::StateStrategy;
15use sc_network_sync::types::BadPeer;
16use sp_blockchain::{Error as ClientError, HeaderBackend};
17use sp_runtime::traits::{Block as BlockT, NumberFor};
18use std::sync::Arc;
19use tracing::{debug, trace, warn};
20
21mod rep {
22 use sc_network::ReputationChange as Rep;
23 pub(super) const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
25 pub(super) const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
27 pub(super) const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
29 pub(super) const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
31}
32
33pub struct SnapSyncingEngine<'a, Block>
34where
35 Block: BlockT,
36{
37 strategy: StateStrategy<Block>,
39 pending_responses: PendingResponses,
41 block_announces_protocol_name: ProtocolName,
42 network_service_handle: &'a NetworkServiceHandle,
43}
44
45impl<'a, Block> SnapSyncingEngine<'a, Block>
46where
47 Block: BlockT,
48{
49 pub fn new<Client>(
50 client: Arc<Client>,
51 fork_id: Option<&str>,
52 target_header: Block::Header,
53 skip_proof: bool,
54 current_sync_peer: (PeerId, NumberFor<Block>),
55 network_service_handle: &'a NetworkServiceHandle,
56 ) -> Result<Self, ClientError>
57 where
58 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
59 {
60 let genesis_hash = client.info().genesis_hash;
61 let block_announces_protocol_name = ProtocolName::from(if let Some(fork_id) = fork_id {
62 format!("/{}/{}/transactions/1", hex::encode(genesis_hash), fork_id)
63 } else {
64 format!("/{}/transactions/1", hex::encode(genesis_hash))
65 });
66
67 let strategy = StateStrategy::new(
69 client,
70 target_header,
71 None,
74 None,
77 skip_proof,
78 vec![current_sync_peer].into_iter(),
79 ProtocolName::from(generate_protocol_name(genesis_hash, fork_id)),
80 );
81
82 Ok(Self {
83 strategy,
84 pending_responses: PendingResponses::new(),
85 block_announces_protocol_name,
86 network_service_handle,
87 })
88 }
89
90 pub async fn download_state(mut self) -> Result<IncomingBlock<Block>, ClientError> {
92 debug!("Starting state downloading");
93
94 loop {
95 let mut actions = self
97 .strategy
98 .actions(self.network_service_handle)
99 .peekable();
100 if actions.peek().is_none() {
101 return Err(ClientError::Backend(
102 "Sync state download failed: no further actions".into(),
103 ));
104 }
105
106 for action in actions {
107 match action {
108 SyncingAction::StartRequest {
109 peer_id,
110 key,
111 request,
112 remove_obsolete: _,
114 } => {
115 self.pending_responses.insert(peer_id, key, request);
116 }
117 SyncingAction::CancelRequest { .. } => {
118 return Err(ClientError::Application(
119 "Unexpected SyncingAction::CancelRequest".into(),
120 ));
121 }
122 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
123 self.pending_responses
124 .remove(peer_id, StateStrategy::<Block>::STRATEGY_KEY);
125
126 trace!(%peer_id, "Peer dropped: {rep:?}");
127 }
128 SyncingAction::ImportBlocks { blocks, .. } => {
129 return blocks.into_iter().next().ok_or_else(|| {
130 ClientError::Application(
131 "SyncingAction::ImportBlocks didn't contain any blocks to import"
132 .into(),
133 )
134 });
135 }
136 SyncingAction::ImportJustifications { .. } => {
137 return Err(ClientError::Application(
138 "Unexpected SyncingAction::ImportJustifications".into(),
139 ));
140 }
141 SyncingAction::Finished => {
142 return Err(ClientError::Backend(
143 "Sync state finished without blocks to import".into(),
144 ));
145 }
146 }
147 }
148
149 let response_event = self.pending_responses.select_next_some().await;
150 self.process_response_event(response_event);
151 }
152 }
153
154 fn process_response_event(&mut self, response_event: ResponseEvent) {
155 let ResponseEvent {
156 peer_id,
157 key: _,
158 response: response_result,
159 } = response_event;
160
161 match response_result {
162 Ok(Ok((response, _protocol_name))) => {
163 let Ok(response) = response.downcast::<Vec<u8>>() else {
164 warn!("Failed to downcast state response");
165 debug_assert!(false);
166 return;
167 };
168
169 self.strategy.on_state_response(&peer_id, *response);
170 }
171 Ok(Err(e)) => {
172 debug!("Request to peer {peer_id:?} failed: {e:?}.");
173
174 match e {
175 RequestFailure::Network(OutboundFailure::Timeout) => {
176 self.network_service_handle
177 .report_peer(peer_id, rep::TIMEOUT);
178 self.network_service_handle
179 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
180 }
181 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
182 self.network_service_handle
183 .report_peer(peer_id, rep::BAD_PROTOCOL);
184 self.network_service_handle
185 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
186 }
187 RequestFailure::Network(OutboundFailure::DialFailure) => {
188 self.network_service_handle
189 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
190 }
191 RequestFailure::Refused => {
192 self.network_service_handle
193 .report_peer(peer_id, rep::REFUSED);
194 self.network_service_handle
195 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
196 }
197 RequestFailure::Network(OutboundFailure::ConnectionClosed)
198 | RequestFailure::NotConnected => {
199 self.network_service_handle
200 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
201 }
202 RequestFailure::UnknownProtocol => {
203 debug_assert!(false, "Block request protocol should always be known.");
204 }
205 RequestFailure::Obsolete => {
206 debug_assert!(
207 false,
208 "Can not receive `RequestFailure::Obsolete` after dropping the \
209 response receiver.",
210 );
211 }
212 RequestFailure::Network(OutboundFailure::Io(_)) => {
213 self.network_service_handle.report_peer(peer_id, rep::IO);
214 self.network_service_handle
215 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
216 }
217 }
218 }
219 Err(oneshot::Canceled) => {
220 trace!("Request to peer {peer_id:?} failed due to oneshot being canceled.");
221 self.network_service_handle
222 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
223 }
224 }
225 }
226}