sc_subspace_block_relay/protocol/
compact_block.rs

1//! Compact block implementation.
2
3use crate::protocol::{ClientBackend, ProtocolUnitInfo, Resolved, ServerBackend};
4use crate::types::RelayError;
5use crate::utils::NetworkPeerHandle;
6use derive_more::From;
7use parity_scale_codec::{Decode, Encode};
8use std::collections::BTreeMap;
9use tracing::{trace, warn};
10
11/// The initial request(currently we don't need to do send anything
12/// as part of the initial download request for compact blocks).
13#[derive(From, Encode, Decode)]
14pub(crate) enum CompactBlockInitialRequest {
15    #[codec(index = 0)]
16    V0,
17    // Next version/variant goes here:
18    // #[codec(index = 1)]
19}
20
21/// The compact block initial response from the server.
22#[derive(Encode, Decode)]
23pub(crate) struct CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
24    /// The download unit
25    download_unit_id: DownloadUnitId,
26
27    /// List of the protocol units Ids.
28    protocol_units: Vec<ProtocolUnitInfo<ProtocolUnitId, ProtocolUnit>>,
29}
30
31/// The handshake messages from the client.
32#[derive(From, Encode, Decode)]
33pub(crate) enum CompactBlockHandshake<DownloadUnitId, ProtocolUnitId> {
34    /// Request for missing transactions
35    #[codec(index = 0)]
36    MissingEntriesV0(MissingEntriesRequest<DownloadUnitId, ProtocolUnitId>),
37    // Next version/variant goes here:
38    // #[codec(index = 1)]
39}
40
41/// The handshake reply from the server.
42#[derive(From, Encode, Decode)]
43pub(crate) enum CompactBlockHandshakeResponse<ProtocolUnit> {
44    /// Response for missing transactions
45    #[codec(index = 0)]
46    MissingEntriesV0(MissingEntriesResponse<ProtocolUnit>),
47    // Next version/variant goes here:
48    // #[codec(index = 1)]
49}
50
51/// Request for missing transactions
52#[derive(Encode, Decode)]
53pub(crate) struct MissingEntriesRequest<DownloadUnitId, ProtocolUnitId> {
54    /// The download unit
55    download_unit_id: DownloadUnitId,
56
57    /// Map of missing entry Id ->  protocol unit Id.
58    /// The missing entry Id is an opaque identifier used by the client
59    /// side. The server side just returns it as is with the response.
60    protocol_unit_ids: BTreeMap<u64, ProtocolUnitId>,
61}
62
63/// Response for missing transactions
64#[derive(Encode, Decode)]
65pub(crate) struct MissingEntriesResponse<ProtocolUnit> {
66    /// Map of missing entry Id ->  protocol unit.
67    protocol_units: BTreeMap<u64, ProtocolUnit>,
68}
69
70struct ResolveContext<ProtocolUnitId, ProtocolUnit> {
71    resolved: BTreeMap<u64, Resolved<ProtocolUnitId, ProtocolUnit>>,
72    local_miss: BTreeMap<u64, ProtocolUnitId>,
73}
74
75pub(crate) struct CompactBlockClient<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
76    _phantom_data: std::marker::PhantomData<(DownloadUnitId, ProtocolUnitId, ProtocolUnit)>,
77}
78
79impl<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
80    CompactBlockClient<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
81where
82    DownloadUnitId: Send + Sync + Encode + Decode + Clone + std::fmt::Debug,
83    ProtocolUnitId: Send + Sync + Encode + Decode + Clone,
84    ProtocolUnit: Send + Sync + Encode + Decode + Clone,
85{
86    /// Creates the client.
87    pub(crate) fn new() -> Self {
88        Self {
89            _phantom_data: Default::default(),
90        }
91    }
92
93    /// Builds the initial request.
94    pub(crate) fn build_initial_request(
95        &self,
96        _backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
97    ) -> CompactBlockInitialRequest {
98        CompactBlockInitialRequest::V0
99    }
100
101    /// Resolves the initial response to produce the protocol units.
102    pub(crate) async fn resolve_initial_response<Request>(
103        &self,
104        compact_response: CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
105        network_peer_handle: &NetworkPeerHandle,
106        backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
107    ) -> Result<(DownloadUnitId, Vec<Resolved<ProtocolUnitId, ProtocolUnit>>), RelayError>
108    where
109        Request: From<CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>> + Encode + Send + Sync,
110    {
111        // Try to resolve the hashes locally first.
112        let context = self.resolve_local(&compact_response, backend)?;
113        if context.resolved.len() == compact_response.protocol_units.len() {
114            trace!(
115                "relay::resolve: {:?}: resolved locally[{}]",
116                compact_response.download_unit_id,
117                compact_response.protocol_units.len()
118            );
119            return Ok((
120                compact_response.download_unit_id,
121                context.resolved.into_values().collect(),
122            ));
123        }
124
125        // Resolve the misses from the server
126        let misses = context.local_miss.len();
127        let download_unit_id = compact_response.download_unit_id.clone();
128        let resolved = self
129            .resolve_misses::<Request>(compact_response, context, network_peer_handle)
130            .await?;
131        trace!(
132            "relay::resolve: {:?}: resolved by server[{},{}]",
133            download_unit_id,
134            resolved.len(),
135            misses,
136        );
137        Ok((download_unit_id, resolved))
138    }
139
140    /// Tries to resolve the entries in InitialResponse locally.
141    fn resolve_local(
142        &self,
143        compact_response: &CompactBlockInitialResponse<
144            DownloadUnitId,
145            ProtocolUnitId,
146            ProtocolUnit,
147        >,
148        backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
149    ) -> Result<ResolveContext<ProtocolUnitId, ProtocolUnit>, RelayError> {
150        let mut context = ResolveContext {
151            resolved: BTreeMap::new(),
152            local_miss: BTreeMap::new(),
153        };
154
155        for (index, entry) in compact_response.protocol_units.iter().enumerate() {
156            let ProtocolUnitInfo { id, unit } = entry;
157            if let Some(unit) = unit {
158                // The full protocol unit was returned
159                context.resolved.insert(
160                    index as u64,
161                    Resolved {
162                        protocol_unit_id: id.clone(),
163                        protocol_unit: unit.clone(),
164                        locally_resolved: true,
165                    },
166                );
167                continue;
168            }
169
170            match backend.protocol_unit(id) {
171                Some(ret) => {
172                    context.resolved.insert(
173                        index as u64,
174                        Resolved {
175                            protocol_unit_id: id.clone(),
176                            protocol_unit: ret,
177                            locally_resolved: true,
178                        },
179                    );
180                }
181                None => {
182                    context.local_miss.insert(index as u64, id.clone());
183                }
184            }
185        }
186
187        Ok(context)
188    }
189
190    /// Fetches the missing entries from the server.
191    async fn resolve_misses<Request>(
192        &self,
193        compact_response: CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
194        context: ResolveContext<ProtocolUnitId, ProtocolUnit>,
195        network_peer_handle: &NetworkPeerHandle,
196    ) -> Result<Vec<Resolved<ProtocolUnitId, ProtocolUnit>>, RelayError>
197    where
198        Request: From<CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>> + Encode + Send + Sync,
199    {
200        let ResolveContext {
201            mut resolved,
202            local_miss,
203        } = context;
204        let missing = local_miss.len();
205        // Request the missing entries from the server
206        let request = CompactBlockHandshake::from(MissingEntriesRequest {
207            download_unit_id: compact_response.download_unit_id.clone(),
208            protocol_unit_ids: local_miss.clone(),
209        });
210
211        let response: CompactBlockHandshakeResponse<ProtocolUnit> =
212            network_peer_handle.request(Request::from(request)).await?;
213        let CompactBlockHandshakeResponse::MissingEntriesV0(missing_entries_response) = response;
214
215        if missing_entries_response.protocol_units.len() != missing {
216            return Err(RelayError::ResolveMismatch {
217                expected: missing,
218                actual: missing_entries_response.protocol_units.len(),
219            });
220        }
221
222        // Merge the resolved entries from the server
223        for (missing_key, protocol_unit_id) in local_miss.into_iter() {
224            if let Some(protocol_unit) = missing_entries_response.protocol_units.get(&missing_key) {
225                resolved.insert(
226                    missing_key,
227                    Resolved {
228                        protocol_unit_id,
229                        protocol_unit: protocol_unit.clone(),
230                        locally_resolved: false,
231                    },
232                );
233            } else {
234                return Err(RelayError::ResolvedNotFound(missing));
235            }
236        }
237
238        Ok(resolved.into_values().collect())
239    }
240}
241
242pub(crate) struct CompactBlockServer<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
243    _phantom_data: std::marker::PhantomData<(DownloadUnitId, ProtocolUnitId, ProtocolUnit)>,
244}
245
246impl<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
247    CompactBlockServer<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
248where
249    DownloadUnitId: Encode + Decode + Clone,
250    ProtocolUnitId: Encode + Decode + Clone,
251    ProtocolUnit: Encode + Decode,
252{
253    /// Creates the server.
254    pub(crate) fn new() -> Self {
255        Self {
256            _phantom_data: Default::default(),
257        }
258    }
259
260    /// Builds the protocol response to the initial request.
261    pub(crate) fn build_initial_response(
262        &self,
263        download_unit_id: &DownloadUnitId,
264        _initial_request: CompactBlockInitialRequest,
265        backend: &dyn ServerBackend<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
266    ) -> Result<CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>, RelayError>
267    {
268        // Return the info of the members in the download unit.
269        Ok(CompactBlockInitialResponse {
270            download_unit_id: download_unit_id.clone(),
271            protocol_units: backend.download_unit_members(download_unit_id)?,
272        })
273    }
274
275    /// Handles the additional client messages during the reconcile phase.
276    pub(crate) fn on_protocol_message(
277        &self,
278        message: CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>,
279        backend: &dyn ServerBackend<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
280    ) -> Result<CompactBlockHandshakeResponse<ProtocolUnit>, RelayError> {
281        let CompactBlockHandshake::MissingEntriesV0(request) = message;
282
283        let mut protocol_units = BTreeMap::new();
284        let total_len = request.protocol_unit_ids.len();
285        for (missing_id, protocol_unit_id) in request.protocol_unit_ids {
286            if let Some(protocol_unit) =
287                backend.protocol_unit(&request.download_unit_id, &protocol_unit_id)
288            {
289                protocol_units.insert(missing_id, protocol_unit);
290            } else {
291                warn!("relay::on_request: missing entry not found");
292            }
293        }
294        if total_len != protocol_units.len() {
295            warn!(
296                "relay::compact_blocks::on_request: could not resolve all entries: {total_len}/{}",
297                protocol_units.len()
298            );
299        }
300        Ok(CompactBlockHandshakeResponse::from(
301            MissingEntriesResponse { protocol_units },
302        ))
303    }
304}