sc_subspace_block_relay/protocol/
compact_block.rs1use crate::protocol::{ClientBackend, ProtocolUnitInfo, Resolved, ServerBackend};
4use crate::types::RelayError;
5use crate::utils::NetworkPeerHandle;
6use derive_more::From;
7use parity_scale_codec::{Decode, Encode};
8use std::collections::BTreeMap;
9use tracing::{trace, warn};
10
11#[derive(From, Encode, Decode)]
14pub(crate) enum CompactBlockInitialRequest {
15 #[codec(index = 0)]
16 V0,
17 }
20
21#[derive(Encode, Decode)]
23pub(crate) struct CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
24 download_unit_id: DownloadUnitId,
26
27 protocol_units: Vec<ProtocolUnitInfo<ProtocolUnitId, ProtocolUnit>>,
29}
30
31#[derive(From, Encode, Decode)]
33pub(crate) enum CompactBlockHandshake<DownloadUnitId, ProtocolUnitId> {
34 #[codec(index = 0)]
36 MissingEntriesV0(MissingEntriesRequest<DownloadUnitId, ProtocolUnitId>),
37 }
40
41#[derive(From, Encode, Decode)]
43pub(crate) enum CompactBlockHandshakeResponse<ProtocolUnit> {
44 #[codec(index = 0)]
46 MissingEntriesV0(MissingEntriesResponse<ProtocolUnit>),
47 }
50
51#[derive(Encode, Decode)]
53pub(crate) struct MissingEntriesRequest<DownloadUnitId, ProtocolUnitId> {
54 download_unit_id: DownloadUnitId,
56
57 protocol_unit_ids: BTreeMap<u64, ProtocolUnitId>,
61}
62
63#[derive(Encode, Decode)]
65pub(crate) struct MissingEntriesResponse<ProtocolUnit> {
66 protocol_units: BTreeMap<u64, ProtocolUnit>,
68}
69
70struct ResolveContext<ProtocolUnitId, ProtocolUnit> {
71 resolved: BTreeMap<u64, Resolved<ProtocolUnitId, ProtocolUnit>>,
72 local_miss: BTreeMap<u64, ProtocolUnitId>,
73}
74
75pub(crate) struct CompactBlockClient<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
76 _phantom_data: std::marker::PhantomData<(DownloadUnitId, ProtocolUnitId, ProtocolUnit)>,
77}
78
79impl<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
80 CompactBlockClient<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
81where
82 DownloadUnitId: Send + Sync + Encode + Decode + Clone + std::fmt::Debug,
83 ProtocolUnitId: Send + Sync + Encode + Decode + Clone,
84 ProtocolUnit: Send + Sync + Encode + Decode + Clone,
85{
86 pub(crate) fn new() -> Self {
88 Self {
89 _phantom_data: Default::default(),
90 }
91 }
92
93 pub(crate) fn build_initial_request(
95 &self,
96 _backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
97 ) -> CompactBlockInitialRequest {
98 CompactBlockInitialRequest::V0
99 }
100
101 pub(crate) async fn resolve_initial_response<Request>(
103 &self,
104 compact_response: CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
105 network_peer_handle: &NetworkPeerHandle,
106 backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
107 ) -> Result<(DownloadUnitId, Vec<Resolved<ProtocolUnitId, ProtocolUnit>>), RelayError>
108 where
109 Request: From<CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>> + Encode + Send + Sync,
110 {
111 let context = self.resolve_local(&compact_response, backend)?;
113 if context.resolved.len() == compact_response.protocol_units.len() {
114 trace!(
115 "relay::resolve: {:?}: resolved locally[{}]",
116 compact_response.download_unit_id,
117 compact_response.protocol_units.len()
118 );
119 return Ok((
120 compact_response.download_unit_id,
121 context.resolved.into_values().collect(),
122 ));
123 }
124
125 let misses = context.local_miss.len();
127 let download_unit_id = compact_response.download_unit_id.clone();
128 let resolved = self
129 .resolve_misses::<Request>(compact_response, context, network_peer_handle)
130 .await?;
131 trace!(
132 "relay::resolve: {:?}: resolved by server[{},{}]",
133 download_unit_id,
134 resolved.len(),
135 misses,
136 );
137 Ok((download_unit_id, resolved))
138 }
139
140 fn resolve_local(
142 &self,
143 compact_response: &CompactBlockInitialResponse<
144 DownloadUnitId,
145 ProtocolUnitId,
146 ProtocolUnit,
147 >,
148 backend: &dyn ClientBackend<ProtocolUnitId, ProtocolUnit>,
149 ) -> Result<ResolveContext<ProtocolUnitId, ProtocolUnit>, RelayError> {
150 let mut context = ResolveContext {
151 resolved: BTreeMap::new(),
152 local_miss: BTreeMap::new(),
153 };
154
155 for (index, entry) in compact_response.protocol_units.iter().enumerate() {
156 let ProtocolUnitInfo { id, unit } = entry;
157 if let Some(unit) = unit {
158 context.resolved.insert(
160 index as u64,
161 Resolved {
162 protocol_unit_id: id.clone(),
163 protocol_unit: unit.clone(),
164 locally_resolved: true,
165 },
166 );
167 continue;
168 }
169
170 match backend.protocol_unit(id) {
171 Some(ret) => {
172 context.resolved.insert(
173 index as u64,
174 Resolved {
175 protocol_unit_id: id.clone(),
176 protocol_unit: ret,
177 locally_resolved: true,
178 },
179 );
180 }
181 None => {
182 context.local_miss.insert(index as u64, id.clone());
183 }
184 }
185 }
186
187 Ok(context)
188 }
189
190 async fn resolve_misses<Request>(
192 &self,
193 compact_response: CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
194 context: ResolveContext<ProtocolUnitId, ProtocolUnit>,
195 network_peer_handle: &NetworkPeerHandle,
196 ) -> Result<Vec<Resolved<ProtocolUnitId, ProtocolUnit>>, RelayError>
197 where
198 Request: From<CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>> + Encode + Send + Sync,
199 {
200 let ResolveContext {
201 mut resolved,
202 local_miss,
203 } = context;
204 let missing = local_miss.len();
205 let request = CompactBlockHandshake::from(MissingEntriesRequest {
207 download_unit_id: compact_response.download_unit_id.clone(),
208 protocol_unit_ids: local_miss.clone(),
209 });
210
211 let response: CompactBlockHandshakeResponse<ProtocolUnit> =
212 network_peer_handle.request(Request::from(request)).await?;
213 let CompactBlockHandshakeResponse::MissingEntriesV0(missing_entries_response) = response;
214
215 if missing_entries_response.protocol_units.len() != missing {
216 return Err(RelayError::ResolveMismatch {
217 expected: missing,
218 actual: missing_entries_response.protocol_units.len(),
219 });
220 }
221
222 for (missing_key, protocol_unit_id) in local_miss.into_iter() {
224 if let Some(protocol_unit) = missing_entries_response.protocol_units.get(&missing_key) {
225 resolved.insert(
226 missing_key,
227 Resolved {
228 protocol_unit_id,
229 protocol_unit: protocol_unit.clone(),
230 locally_resolved: false,
231 },
232 );
233 } else {
234 return Err(RelayError::ResolvedNotFound(missing));
235 }
236 }
237
238 Ok(resolved.into_values().collect())
239 }
240}
241
242pub(crate) struct CompactBlockServer<DownloadUnitId, ProtocolUnitId, ProtocolUnit> {
243 _phantom_data: std::marker::PhantomData<(DownloadUnitId, ProtocolUnitId, ProtocolUnit)>,
244}
245
246impl<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
247 CompactBlockServer<DownloadUnitId, ProtocolUnitId, ProtocolUnit>
248where
249 DownloadUnitId: Encode + Decode + Clone,
250 ProtocolUnitId: Encode + Decode + Clone,
251 ProtocolUnit: Encode + Decode,
252{
253 pub(crate) fn new() -> Self {
255 Self {
256 _phantom_data: Default::default(),
257 }
258 }
259
260 pub(crate) fn build_initial_response(
262 &self,
263 download_unit_id: &DownloadUnitId,
264 _initial_request: CompactBlockInitialRequest,
265 backend: &dyn ServerBackend<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
266 ) -> Result<CompactBlockInitialResponse<DownloadUnitId, ProtocolUnitId, ProtocolUnit>, RelayError>
267 {
268 Ok(CompactBlockInitialResponse {
270 download_unit_id: download_unit_id.clone(),
271 protocol_units: backend.download_unit_members(download_unit_id)?,
272 })
273 }
274
275 pub(crate) fn on_protocol_message(
277 &self,
278 message: CompactBlockHandshake<DownloadUnitId, ProtocolUnitId>,
279 backend: &dyn ServerBackend<DownloadUnitId, ProtocolUnitId, ProtocolUnit>,
280 ) -> Result<CompactBlockHandshakeResponse<ProtocolUnit>, RelayError> {
281 let CompactBlockHandshake::MissingEntriesV0(request) = message;
282
283 let mut protocol_units = BTreeMap::new();
284 let total_len = request.protocol_unit_ids.len();
285 for (missing_id, protocol_unit_id) in request.protocol_unit_ids {
286 if let Some(protocol_unit) =
287 backend.protocol_unit(&request.download_unit_id, &protocol_unit_id)
288 {
289 protocol_units.insert(missing_id, protocol_unit);
290 } else {
291 warn!("relay::on_request: missing entry not found");
292 }
293 }
294 if total_len != protocol_units.len() {
295 warn!(
296 "relay::compact_blocks::on_request: could not resolve all entries: {total_len}/{}",
297 protocol_units.len()
298 );
299 }
300 Ok(CompactBlockHandshakeResponse::from(
301 MissingEntriesResponse { protocol_units },
302 ))
303 }
304}