sc_subspace_sync_common/
snap_sync_engine.rs1use futures::channel::oneshot;
5use futures::StreamExt;
6use sc_client_api::ProofProvider;
7use sc_consensus::IncomingBlock;
8use sc_network::types::ProtocolName;
9use sc_network::{OutboundFailure, PeerId, RequestFailure};
10use sc_network_sync::pending_responses::{PendingResponses, ResponseEvent};
11use sc_network_sync::service::network::NetworkServiceHandle;
12use sc_network_sync::state_request_handler::generate_protocol_name;
13use sc_network_sync::strategy::state::StateStrategy;
14use sc_network_sync::strategy::SyncingAction;
15use sc_network_sync::types::BadPeer;
16use sp_blockchain::{Error as ClientError, HeaderBackend};
17use sp_runtime::traits::{Block as BlockT, NumberFor};
18use std::sync::Arc;
19use tracing::{debug, trace, warn};
20
21mod rep {
22 use sc_network::ReputationChange as Rep;
23 pub(super) const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
25 pub(super) const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
27 pub(super) const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
29 pub(super) const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
31}
32
33pub struct SnapSyncingEngine<'a, Block>
34where
35 Block: BlockT,
36{
37 strategy: StateStrategy<Block>,
39 pending_responses: PendingResponses,
41 block_announces_protocol_name: ProtocolName,
42 network_service_handle: &'a NetworkServiceHandle,
43}
44
45impl<'a, Block> SnapSyncingEngine<'a, Block>
46where
47 Block: BlockT,
48{
49 pub fn new<Client>(
50 client: Arc<Client>,
51 fork_id: Option<&str>,
52 target_header: Block::Header,
53 skip_proof: bool,
54 current_sync_peer: (PeerId, NumberFor<Block>),
55 network_service_handle: &'a NetworkServiceHandle,
56 ) -> Result<Self, ClientError>
57 where
58 Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
59 {
60 let genesis_hash = client.info().genesis_hash;
61 let block_announces_protocol_name = ProtocolName::from(if let Some(fork_id) = fork_id {
62 format!(
63 "/{}/{}/transactions/1",
64 array_bytes::bytes2hex("", genesis_hash),
65 fork_id
66 )
67 } else {
68 format!(
69 "/{}/transactions/1",
70 array_bytes::bytes2hex("", genesis_hash)
71 )
72 });
73
74 let strategy = StateStrategy::new(
76 client,
77 target_header,
78 None,
81 None,
84 skip_proof,
85 vec![current_sync_peer].into_iter(),
86 ProtocolName::from(generate_protocol_name(genesis_hash, fork_id)),
87 );
88
89 Ok(Self {
90 strategy,
91 pending_responses: PendingResponses::new(),
92 block_announces_protocol_name,
93 network_service_handle,
94 })
95 }
96
97 pub async fn download_state(mut self) -> Result<IncomingBlock<Block>, ClientError> {
99 debug!("Starting state downloading");
100
101 loop {
102 let mut actions = self
104 .strategy
105 .actions(self.network_service_handle)
106 .peekable();
107 if actions.peek().is_none() {
108 return Err(ClientError::Backend(
109 "Sync state download failed: no further actions".into(),
110 ));
111 }
112
113 for action in actions {
114 match action {
115 SyncingAction::StartRequest {
116 peer_id,
117 key,
118 request,
119 remove_obsolete: _,
121 } => {
122 self.pending_responses.insert(peer_id, key, request);
123 }
124 SyncingAction::CancelRequest { .. } => {
125 return Err(ClientError::Application(
126 "Unexpected SyncingAction::CancelRequest".into(),
127 ));
128 }
129 SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
130 self.pending_responses
131 .remove(peer_id, StateStrategy::<Block>::STRATEGY_KEY);
132
133 trace!(%peer_id, "Peer dropped: {rep:?}");
134 }
135 SyncingAction::ImportBlocks { blocks, .. } => {
136 return blocks.into_iter().next().ok_or_else(|| {
137 ClientError::Application(
138 "SyncingAction::ImportBlocks didn't contain any blocks to import"
139 .into(),
140 )
141 });
142 }
143 SyncingAction::ImportJustifications { .. } => {
144 return Err(ClientError::Application(
145 "Unexpected SyncingAction::ImportJustifications".into(),
146 ));
147 }
148 SyncingAction::Finished => {
149 return Err(ClientError::Backend(
150 "Sync state finished without blocks to import".into(),
151 ));
152 }
153 }
154 }
155
156 let response_event = self.pending_responses.select_next_some().await;
157 self.process_response_event(response_event);
158 }
159 }
160
161 fn process_response_event(&mut self, response_event: ResponseEvent) {
162 let ResponseEvent {
163 peer_id,
164 key: _,
165 response: response_result,
166 } = response_event;
167
168 match response_result {
169 Ok(Ok((response, _protocol_name))) => {
170 let Ok(response) = response.downcast::<Vec<u8>>() else {
171 warn!("Failed to downcast state response");
172 debug_assert!(false);
173 return;
174 };
175
176 self.strategy.on_state_response(&peer_id, *response);
177 }
178 Ok(Err(e)) => {
179 debug!("Request to peer {peer_id:?} failed: {e:?}.");
180
181 match e {
182 RequestFailure::Network(OutboundFailure::Timeout) => {
183 self.network_service_handle
184 .report_peer(peer_id, rep::TIMEOUT);
185 self.network_service_handle
186 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
187 }
188 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
189 self.network_service_handle
190 .report_peer(peer_id, rep::BAD_PROTOCOL);
191 self.network_service_handle
192 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
193 }
194 RequestFailure::Network(OutboundFailure::DialFailure) => {
195 self.network_service_handle
196 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
197 }
198 RequestFailure::Refused => {
199 self.network_service_handle
200 .report_peer(peer_id, rep::REFUSED);
201 self.network_service_handle
202 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
203 }
204 RequestFailure::Network(OutboundFailure::ConnectionClosed)
205 | RequestFailure::NotConnected => {
206 self.network_service_handle
207 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
208 }
209 RequestFailure::UnknownProtocol => {
210 debug_assert!(false, "Block request protocol should always be known.");
211 }
212 RequestFailure::Obsolete => {
213 debug_assert!(
214 false,
215 "Can not receive `RequestFailure::Obsolete` after dropping the \
216 response receiver.",
217 );
218 }
219 RequestFailure::Network(OutboundFailure::Io(_)) => {
220 self.network_service_handle.report_peer(peer_id, rep::IO);
221 self.network_service_handle
222 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
223 }
224 }
225 }
226 Err(oneshot::Canceled) => {
227 trace!("Request to peer {peer_id:?} failed due to oneshot being canceled.");
228 self.network_service_handle
229 .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
230 }
231 }
232 }
233}