sc_subspace_sync_common/
snap_sync_engine.rs

1//! `SyncingEngine` is the actor responsible for syncing Substrate chain
2//! to tip and keep the blockchain up to date with network updates.
3
4use futures::channel::oneshot;
5use futures::StreamExt;
6use sc_client_api::ProofProvider;
7use sc_consensus::IncomingBlock;
8use sc_network::types::ProtocolName;
9use sc_network::{OutboundFailure, PeerId, RequestFailure};
10use sc_network_sync::pending_responses::{PendingResponses, ResponseEvent};
11use sc_network_sync::service::network::NetworkServiceHandle;
12use sc_network_sync::state_request_handler::generate_protocol_name;
13use sc_network_sync::strategy::state::StateStrategy;
14use sc_network_sync::strategy::SyncingAction;
15use sc_network_sync::types::BadPeer;
16use sp_blockchain::{Error as ClientError, HeaderBackend};
17use sp_runtime::traits::{Block as BlockT, NumberFor};
18use std::sync::Arc;
19use tracing::{debug, trace, warn};
20
21mod rep {
22    use sc_network::ReputationChange as Rep;
23    /// Peer is on unsupported protocol version.
24    pub(super) const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
25    /// Reputation change when a peer refuses a request.
26    pub(super) const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
27    /// Reputation change when a peer doesn't respond in time to our messages.
28    pub(super) const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
29    /// Reputation change when a peer connection failed with IO error.
30    pub(super) const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
31}
32
33pub struct SnapSyncingEngine<'a, Block>
34where
35    Block: BlockT,
36{
37    /// Syncing strategy
38    strategy: StateStrategy<Block>,
39    /// Pending responses
40    pending_responses: PendingResponses,
41    block_announces_protocol_name: ProtocolName,
42    network_service_handle: &'a NetworkServiceHandle,
43}
44
45impl<'a, Block> SnapSyncingEngine<'a, Block>
46where
47    Block: BlockT,
48{
49    pub fn new<Client>(
50        client: Arc<Client>,
51        fork_id: Option<&str>,
52        target_header: Block::Header,
53        skip_proof: bool,
54        current_sync_peer: (PeerId, NumberFor<Block>),
55        network_service_handle: &'a NetworkServiceHandle,
56    ) -> Result<Self, ClientError>
57    where
58        Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
59    {
60        let genesis_hash = client.info().genesis_hash;
61        let block_announces_protocol_name = ProtocolName::from(if let Some(fork_id) = fork_id {
62            format!(
63                "/{}/{}/transactions/1",
64                array_bytes::bytes2hex("", genesis_hash),
65                fork_id
66            )
67        } else {
68            format!(
69                "/{}/transactions/1",
70                array_bytes::bytes2hex("", genesis_hash)
71            )
72        });
73
74        // Initialize syncing strategy.
75        let strategy = StateStrategy::new(
76            client,
77            target_header,
78            // We only care about the state, this value is just forwarded back into block to
79            // import that is thrown away below
80            None,
81            // We only care about the state, this value is just forwarded back into block to
82            // import that is thrown away below
83            None,
84            skip_proof,
85            vec![current_sync_peer].into_iter(),
86            ProtocolName::from(generate_protocol_name(genesis_hash, fork_id)),
87        );
88
89        Ok(Self {
90            strategy,
91            pending_responses: PendingResponses::new(),
92            block_announces_protocol_name,
93            network_service_handle,
94        })
95    }
96
97    // Downloads state and returns incoming block with state pre-populated and ready for importing
98    pub async fn download_state(mut self) -> Result<IncomingBlock<Block>, ClientError> {
99        debug!("Starting state downloading");
100
101        loop {
102            // Process actions requested by a syncing strategy.
103            let mut actions = self
104                .strategy
105                .actions(self.network_service_handle)
106                .peekable();
107            if actions.peek().is_none() {
108                return Err(ClientError::Backend(
109                    "Sync state download failed: no further actions".into(),
110                ));
111            }
112
113            for action in actions {
114                match action {
115                    SyncingAction::StartRequest {
116                        peer_id,
117                        key,
118                        request,
119                        // State sync doesn't use this
120                        remove_obsolete: _,
121                    } => {
122                        self.pending_responses.insert(peer_id, key, request);
123                    }
124                    SyncingAction::CancelRequest { .. } => {
125                        return Err(ClientError::Application(
126                            "Unexpected SyncingAction::CancelRequest".into(),
127                        ));
128                    }
129                    SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
130                        self.pending_responses
131                            .remove(peer_id, StateStrategy::<Block>::STRATEGY_KEY);
132
133                        trace!(%peer_id, "Peer dropped: {rep:?}");
134                    }
135                    SyncingAction::ImportBlocks { blocks, .. } => {
136                        return blocks.into_iter().next().ok_or_else(|| {
137                            ClientError::Application(
138                                "SyncingAction::ImportBlocks didn't contain any blocks to import"
139                                    .into(),
140                            )
141                        });
142                    }
143                    SyncingAction::ImportJustifications { .. } => {
144                        return Err(ClientError::Application(
145                            "Unexpected SyncingAction::ImportJustifications".into(),
146                        ));
147                    }
148                    SyncingAction::Finished => {
149                        return Err(ClientError::Backend(
150                            "Sync state finished without blocks to import".into(),
151                        ));
152                    }
153                }
154            }
155
156            let response_event = self.pending_responses.select_next_some().await;
157            self.process_response_event(response_event);
158        }
159    }
160
161    fn process_response_event(&mut self, response_event: ResponseEvent) {
162        let ResponseEvent {
163            peer_id,
164            key: _,
165            response: response_result,
166        } = response_event;
167
168        match response_result {
169            Ok(Ok((response, _protocol_name))) => {
170                let Ok(response) = response.downcast::<Vec<u8>>() else {
171                    warn!("Failed to downcast state response");
172                    debug_assert!(false);
173                    return;
174                };
175
176                self.strategy.on_state_response(&peer_id, *response);
177            }
178            Ok(Err(e)) => {
179                debug!("Request to peer {peer_id:?} failed: {e:?}.");
180
181                match e {
182                    RequestFailure::Network(OutboundFailure::Timeout) => {
183                        self.network_service_handle
184                            .report_peer(peer_id, rep::TIMEOUT);
185                        self.network_service_handle
186                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
187                    }
188                    RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
189                        self.network_service_handle
190                            .report_peer(peer_id, rep::BAD_PROTOCOL);
191                        self.network_service_handle
192                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
193                    }
194                    RequestFailure::Network(OutboundFailure::DialFailure) => {
195                        self.network_service_handle
196                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
197                    }
198                    RequestFailure::Refused => {
199                        self.network_service_handle
200                            .report_peer(peer_id, rep::REFUSED);
201                        self.network_service_handle
202                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
203                    }
204                    RequestFailure::Network(OutboundFailure::ConnectionClosed)
205                    | RequestFailure::NotConnected => {
206                        self.network_service_handle
207                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
208                    }
209                    RequestFailure::UnknownProtocol => {
210                        debug_assert!(false, "Block request protocol should always be known.");
211                    }
212                    RequestFailure::Obsolete => {
213                        debug_assert!(
214                            false,
215                            "Can not receive `RequestFailure::Obsolete` after dropping the \
216                            response receiver.",
217                        );
218                    }
219                    RequestFailure::Network(OutboundFailure::Io(_)) => {
220                        self.network_service_handle.report_peer(peer_id, rep::IO);
221                        self.network_service_handle
222                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
223                    }
224                }
225            }
226            Err(oneshot::Canceled) => {
227                trace!("Request to peer {peer_id:?} failed due to oneshot being canceled.");
228                self.network_service_handle
229                    .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
230            }
231        }
232    }
233}