Skip to main content

sc_subspace_sync_common/
snap_sync_engine.rs

1//! `SyncingEngine` is the actor responsible for syncing Substrate chain
2//! to tip and keep the blockchain up to date with network updates.
3
4use futures::StreamExt;
5use futures::channel::oneshot;
6use sc_client_api::ProofProvider;
7use sc_consensus::IncomingBlock;
8use sc_network::types::ProtocolName;
9use sc_network::{OutboundFailure, PeerId, RequestFailure};
10use sc_network_sync::pending_responses::{PendingResponses, ResponseEvent};
11use sc_network_sync::service::network::NetworkServiceHandle;
12use sc_network_sync::state_request_handler::generate_protocol_name;
13use sc_network_sync::strategy::SyncingAction;
14use sc_network_sync::strategy::state::StateStrategy;
15use sc_network_sync::types::BadPeer;
16use sp_blockchain::{Error as ClientError, HeaderBackend};
17use sp_runtime::traits::{Block as BlockT, NumberFor};
18use std::sync::Arc;
19use tracing::{debug, trace, warn};
20
21mod rep {
22    use sc_network::ReputationChange as Rep;
23    /// Peer is on unsupported protocol version.
24    pub(super) const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
25    /// Reputation change when a peer refuses a request.
26    pub(super) const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
27    /// Reputation change when a peer doesn't respond in time to our messages.
28    pub(super) const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
29    /// Reputation change when a peer connection failed with IO error.
30    pub(super) const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
31}
32
33pub struct SnapSyncingEngine<'a, Block>
34where
35    Block: BlockT,
36{
37    /// Syncing strategy
38    strategy: StateStrategy<Block>,
39    /// Pending responses
40    pending_responses: PendingResponses,
41    block_announces_protocol_name: ProtocolName,
42    network_service_handle: &'a NetworkServiceHandle,
43}
44
45impl<'a, Block> SnapSyncingEngine<'a, Block>
46where
47    Block: BlockT,
48{
49    pub fn new<Client>(
50        client: Arc<Client>,
51        fork_id: Option<&str>,
52        target_header: Block::Header,
53        skip_proof: bool,
54        current_sync_peer: (PeerId, NumberFor<Block>),
55        network_service_handle: &'a NetworkServiceHandle,
56    ) -> Result<Self, ClientError>
57    where
58        Client: HeaderBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
59    {
60        let genesis_hash = client.info().genesis_hash;
61        let block_announces_protocol_name = ProtocolName::from(if let Some(fork_id) = fork_id {
62            format!("/{}/{}/transactions/1", hex::encode(genesis_hash), fork_id)
63        } else {
64            format!("/{}/transactions/1", hex::encode(genesis_hash))
65        });
66
67        // Initialize syncing strategy.
68        let strategy = StateStrategy::new(
69            client,
70            target_header,
71            // We only care about the state, this value is just forwarded back into block to
72            // import that is thrown away below
73            None,
74            // We only care about the state, this value is just forwarded back into block to
75            // import that is thrown away below
76            None,
77            skip_proof,
78            vec![current_sync_peer].into_iter(),
79            ProtocolName::from(generate_protocol_name(genesis_hash, fork_id)),
80        );
81
82        Ok(Self {
83            strategy,
84            pending_responses: PendingResponses::new(),
85            block_announces_protocol_name,
86            network_service_handle,
87        })
88    }
89
90    // Downloads state and returns incoming block with state pre-populated and ready for importing
91    pub async fn download_state(mut self) -> Result<IncomingBlock<Block>, ClientError> {
92        debug!("Starting state downloading");
93
94        loop {
95            // Process actions requested by a syncing strategy.
96            let mut actions = self
97                .strategy
98                .actions(self.network_service_handle)
99                .peekable();
100            if actions.peek().is_none() {
101                return Err(ClientError::Backend(
102                    "Sync state download failed: no further actions".into(),
103                ));
104            }
105
106            for action in actions {
107                match action {
108                    SyncingAction::StartRequest {
109                        peer_id,
110                        key,
111                        request,
112                        // State sync doesn't use this
113                        remove_obsolete: _,
114                    } => {
115                        self.pending_responses.insert(peer_id, key, request);
116                    }
117                    SyncingAction::CancelRequest { .. } => {
118                        return Err(ClientError::Application(
119                            "Unexpected SyncingAction::CancelRequest".into(),
120                        ));
121                    }
122                    SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
123                        self.pending_responses
124                            .remove(peer_id, StateStrategy::<Block>::STRATEGY_KEY);
125
126                        trace!(%peer_id, "Peer dropped: {rep:?}");
127                    }
128                    SyncingAction::ImportBlocks { blocks, .. } => {
129                        return blocks.into_iter().next().ok_or_else(|| {
130                            ClientError::Application(
131                                "SyncingAction::ImportBlocks didn't contain any blocks to import"
132                                    .into(),
133                            )
134                        });
135                    }
136                    SyncingAction::ImportJustifications { .. } => {
137                        return Err(ClientError::Application(
138                            "Unexpected SyncingAction::ImportJustifications".into(),
139                        ));
140                    }
141                    SyncingAction::Finished => {
142                        return Err(ClientError::Backend(
143                            "Sync state finished without blocks to import".into(),
144                        ));
145                    }
146                }
147            }
148
149            let response_event = self.pending_responses.select_next_some().await;
150            self.process_response_event(response_event);
151        }
152    }
153
154    fn process_response_event(&mut self, response_event: ResponseEvent) {
155        let ResponseEvent {
156            peer_id,
157            key: _,
158            response: response_result,
159        } = response_event;
160
161        match response_result {
162            Ok(Ok((response, _protocol_name))) => {
163                let Ok(response) = response.downcast::<Vec<u8>>() else {
164                    warn!("Failed to downcast state response");
165                    debug_assert!(false);
166                    return;
167                };
168
169                self.strategy.on_state_response(&peer_id, *response);
170            }
171            Ok(Err(e)) => {
172                debug!("Request to peer {peer_id:?} failed: {e:?}.");
173
174                match e {
175                    RequestFailure::Network(OutboundFailure::Timeout) => {
176                        self.network_service_handle
177                            .report_peer(peer_id, rep::TIMEOUT);
178                        self.network_service_handle
179                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
180                    }
181                    RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
182                        self.network_service_handle
183                            .report_peer(peer_id, rep::BAD_PROTOCOL);
184                        self.network_service_handle
185                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
186                    }
187                    RequestFailure::Network(OutboundFailure::DialFailure) => {
188                        self.network_service_handle
189                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
190                    }
191                    RequestFailure::Refused => {
192                        self.network_service_handle
193                            .report_peer(peer_id, rep::REFUSED);
194                        self.network_service_handle
195                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
196                    }
197                    RequestFailure::Network(OutboundFailure::ConnectionClosed)
198                    | RequestFailure::NotConnected => {
199                        self.network_service_handle
200                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
201                    }
202                    RequestFailure::UnknownProtocol => {
203                        debug_assert!(false, "Block request protocol should always be known.");
204                    }
205                    RequestFailure::Obsolete => {
206                        debug_assert!(
207                            false,
208                            "Can not receive `RequestFailure::Obsolete` after dropping the \
209                            response receiver.",
210                        );
211                    }
212                    RequestFailure::Network(OutboundFailure::Io(_)) => {
213                        self.network_service_handle.report_peer(peer_id, rep::IO);
214                        self.network_service_handle
215                            .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
216                    }
217                }
218            }
219            Err(oneshot::Canceled) => {
220                trace!("Request to peer {peer_id:?} failed due to oneshot being canceled.");
221                self.network_service_handle
222                    .disconnect_peer(peer_id, self.block_announces_protocol_name.clone());
223            }
224        }
225    }
226}