1use crate::consensus::types::{
4 ConsensusClientMetrics, ConsensusRequest, ConsensusServerMetrics, FullDownloadRequest,
5 FullDownloadResponse, InitialRequest, InitialResponse, PartialBlock, ProtocolInitialRequest,
6 ProtocolInitialResponse, ProtocolMessage,
7};
8use crate::protocol::compact_block::{
9 CompactBlockClient, CompactBlockHandshake, CompactBlockServer,
10};
11use crate::protocol::{ClientBackend, ProtocolUnitInfo, ServerBackend};
12use crate::types::{RelayError, RequestResponseErr};
13use crate::utils::{NetworkPeerHandle, NetworkWrapper};
14use async_trait::async_trait;
15use futures::channel::oneshot;
16use futures::stream::StreamExt;
17use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
18use sc_client_api::{BlockBackend, HeaderBackend};
19use sc_network::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
20use sc_network::types::ProtocolName;
21use sc_network::{NetworkWorker, OutboundFailure, PeerId, RequestFailure};
22use sc_network_common::sync::message::{
23 BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
24};
25use sc_network_sync::block_relay_protocol::{
26 BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer,
27};
28use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
29use sp_api::ProvideRuntimeApi;
30use sp_consensus_subspace::SubspaceApi;
31use sp_runtime::generic::BlockId;
32use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
33use std::fmt;
34use std::num::{NonZeroU32, NonZeroUsize};
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37use subspace_core_primitives::PublicKey;
38use subspace_runtime_primitives::{BlockHashFor, ExtrinsicFor};
39use substrate_prometheus_endpoint::{PrometheusError, Registry};
40use tracing::{debug, info, trace, warn};
41
42const SYNC_PROTOCOL: &str = "/subspace/consensus-block-relay/1";
43
44const NUM_PEER_HINT: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
46
47const MAX_RESPONSE_SIZE: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("Not zero; qed");
50const MAX_RESPONSE_BLOCKS: NonZeroU32 = NonZeroU32::new(128).expect("Not zero; qed");
52
53const TX_SIZE_THRESHOLD: NonZeroUsize = NonZeroUsize::new(32).expect("Not zero; qed");
56
57struct ConsensusRelayClient<Block, Pool>
59where
60 Block: BlockT,
61 Pool: TransactionPool,
62{
63 network: Arc<NetworkWrapper>,
64 protocol_name: ProtocolName,
65 compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
66 backend: Arc<ConsensusClientBackend<Pool>>,
67 metrics: ConsensusClientMetrics,
68 _phantom_data: std::marker::PhantomData<(Block, Pool)>,
69}
70
71impl<Block, Pool> fmt::Debug for ConsensusRelayClient<Block, Pool>
72where
73 Block: BlockT,
74 Pool: TransactionPool,
75{
76 #[inline]
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("ConsensusRelayClient")
79 .field("protocol_name", &self.protocol_name)
80 .finish_non_exhaustive()
81 }
82}
83
84impl<Block, Pool> ConsensusRelayClient<Block, Pool>
85where
86 Block: BlockT,
87 Pool: TransactionPool<Block = Block> + 'static,
88{
89 fn new(
91 network: Arc<NetworkWrapper>,
92 protocol_name: ProtocolName,
93 compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
94 backend: Arc<ConsensusClientBackend<Pool>>,
95 metrics: ConsensusClientMetrics,
96 ) -> Self {
97 Self {
98 network,
99 protocol_name,
100 compact_block,
101 backend,
102 metrics,
103 _phantom_data: Default::default(),
104 }
105 }
106
107 async fn download(
109 &self,
110 who: PeerId,
111 request: BlockRequest<Block>,
112 ) -> Result<Vec<BlockData<Block>>, RelayError> {
113 let start_ts = Instant::now();
114 let network_peer_handle = self
115 .network
116 .network_peer_handle(self.protocol_name.clone(), who)?;
117
118 let initial_request = InitialRequest {
120 from_block: match request.from {
121 FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
122 FromBlock::Number(n) => BlockId::<Block>::Number(n),
123 },
124 block_attributes: request.fields,
125 protocol_request: ProtocolInitialRequest::from(
126 self.compact_block
127 .build_initial_request(self.backend.as_ref()),
128 ),
129 };
130 let initial_response = network_peer_handle
131 .request::<_, InitialResponse<Block, TxHash<Pool>>>(ConsensusRequest::<
132 Block,
133 TxHash<Pool>,
134 >::from(initial_request))
135 .await?;
136
137 let (body, local_miss) = if let Some(protocol_response) = initial_response.protocol_response
139 {
140 let (body, local_miss) = self
141 .resolve_extrinsics::<ConsensusRequest<Block, TxHash<Pool>>>(
142 protocol_response,
143 &network_peer_handle,
144 )
145 .await?;
146 (Some(body), local_miss)
147 } else {
148 (None, 0)
149 };
150
151 let downloaded = vec![initial_response.partial_block.block_data(body)];
153 debug!(
154 block_hash = ?initial_response.block_hash,
155 download_bytes = %downloaded.encoded_size(),
156 %local_miss,
157 duration = ?start_ts.elapsed(),
158 "block_download",
159 );
160 Ok(downloaded)
161 }
162
163 async fn full_download(
165 &self,
166 who: PeerId,
167 request: BlockRequest<Block>,
168 ) -> Result<Vec<BlockData<Block>>, RelayError> {
169 let start_ts = Instant::now();
170 let network_peer_handle = self
171 .network
172 .network_peer_handle(self.protocol_name.clone(), who)?;
173
174 let server_request =
175 ConsensusRequest::<Block, TxHash<Pool>>::from(FullDownloadRequest(request.clone()));
176 let full_response = network_peer_handle
177 .request::<_, FullDownloadResponse<Block>>(server_request)
178 .await?;
179 let downloaded = full_response.0;
180
181 debug!(
182 ?request,
183 download_blocks = %downloaded.len(),
184 download_bytes = %downloaded.encoded_size(),
185 duration = ?start_ts.elapsed(),
186 "full_download",
187 );
188 Ok(downloaded)
189 }
190
191 async fn resolve_extrinsics<Request>(
193 &self,
194 protocol_response: ProtocolInitialResponse<Block, TxHash<Pool>>,
195 network_peer_handle: &NetworkPeerHandle,
196 ) -> Result<(Vec<ExtrinsicFor<Block>>, usize), RelayError>
197 where
198 Request:
199 From<CompactBlockHandshake<BlockHashFor<Block>, TxHash<Pool>>> + Encode + Send + Sync,
200 {
201 let ProtocolInitialResponse::CompactBlock(compact_response) = protocol_response;
202 let (block_hash, resolved) = self
203 .compact_block
204 .resolve_initial_response::<Request>(
205 compact_response,
206 network_peer_handle,
207 self.backend.as_ref(),
208 )
209 .await?;
210 let mut local_miss = 0;
211 let extrinsics = resolved
212 .into_iter()
213 .map(|entry| {
214 let encoded = entry.protocol_unit.encode();
215 if !entry.locally_resolved {
216 trace!(
217 ?block_hash,
218 tx_hash = ?entry.protocol_unit_id,
219 tx_size = %encoded.len(),
220 "resolve_extrinsics: local miss"
221 );
222 self.metrics.tx_pool_miss.inc();
223 local_miss += encoded.len();
224 }
225 entry.protocol_unit
226 })
227 .collect();
228 Ok((extrinsics, local_miss))
229 }
230}
231
232#[async_trait]
233impl<Block, Pool> BlockDownloader<Block> for ConsensusRelayClient<Block, Pool>
234where
235 Block: BlockT,
236 Pool: TransactionPool<Block = Block> + 'static,
237{
238 fn protocol_name(&self) -> &ProtocolName {
239 &self.protocol_name
240 }
241
242 async fn download_blocks(
243 &self,
244 who: PeerId,
245 request: BlockRequest<Block>,
246 ) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
247 let full_download = request.max.is_some_and(|max_blocks| max_blocks > 1);
248 let ret = if full_download {
249 self.full_download(who, request.clone()).await
250 } else {
251 self.download(who, request.clone()).await
252 };
253 match ret {
254 Ok(blocks) => {
255 self.metrics.on_download::<Block>(&blocks);
256 Ok(Ok((blocks.encode(), self.protocol_name.clone())))
257 }
258 Err(error) => {
259 debug!(
260 peer=?who,
261 ?request,
262 ?error,
263 "download_block failed"
264 );
265 self.metrics.on_download_fail(&error);
266 match error {
267 RelayError::RequestResponse(error) => match error {
268 RequestResponseErr::DecodeFailed { .. } => {
269 Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
270 }
271 RequestResponseErr::RequestFailure(err) => Ok(Err(err)),
272 RequestResponseErr::NetworkUninitialized => {
273 Ok(Err(RequestFailure::NotConnected))
275 }
276 RequestResponseErr::Canceled => Err(oneshot::Canceled),
277 },
278 _ => {
279 Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
281 }
282 }
283 }
284 }
285 }
286
287 fn block_response_into_blocks(
288 &self,
289 _request: &BlockRequest<Block>,
290 response: Vec<u8>,
291 ) -> Result<Vec<BlockData<Block>>, BlockResponseError> {
292 Decode::decode(&mut response.as_ref()).map_err(|err| {
293 BlockResponseError::DecodeFailed(format!(
294 "Failed to decode consensus response: {err:?}"
295 ))
296 })
297 }
298}
299
300struct ConsensusRelayServer<Block: BlockT, Client, Pool: TransactionPool> {
302 client: Arc<Client>,
303 compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
304 request_receiver: async_channel::Receiver<IncomingRequest>,
305 backend: Arc<ConsensusServerBackend<Client, Pool>>,
306 metrics: ConsensusServerMetrics,
307 _block: std::marker::PhantomData<Block>,
308}
309
310impl<Block, Client, Pool> ConsensusRelayServer<Block, Client, Pool>
311where
312 Block: BlockT,
313 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
314 Client::Api: SubspaceApi<Block, PublicKey>,
315 Pool: TransactionPool<Block = Block> + 'static,
316{
317 fn new(
319 client: Arc<Client>,
320 compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
321 request_receiver: async_channel::Receiver<IncomingRequest>,
322 backend: Arc<ConsensusServerBackend<Client, Pool>>,
323 metrics: ConsensusServerMetrics,
324 ) -> Self {
325 Self {
326 client,
327 compact_block,
328 request_receiver,
329 backend,
330 metrics,
331 _block: Default::default(),
332 }
333 }
334
335 async fn process_incoming_request(&self, request: IncomingRequest) {
337 let IncomingRequest {
340 peer,
341 payload,
342 pending_response,
343 } = request;
344 let req: ConsensusRequest<Block, TxHash<Pool>> = match Decode::decode(&mut payload.as_ref())
345 {
346 Ok(msg) => msg,
347 Err(err) => {
348 warn!(?peer, ?err, "Decode failed");
349 return;
350 }
351 };
352
353 let ret = match req {
354 ConsensusRequest::BlockDownloadV0(req) => {
355 self.on_initial_request(req).map(|rsp| rsp.encode())
356 }
357 ConsensusRequest::ProtocolMessageV0(msg) => self.on_protocol_message(msg),
358 ConsensusRequest::FullBlockDownloadV0(req) => {
359 self.on_full_download_request(req).map(|rsp| rsp.encode())
360 }
361 };
362
363 match ret {
364 Ok(response) => {
365 self.metrics.on_request();
366 self.send_response(peer, response, pending_response);
367 trace!(?peer, "server: request processed from");
368 }
369 Err(error) => {
370 self.metrics.on_failed_request(&error);
371 debug!(?peer, ?error, "Server error");
372 }
373 }
374 }
375
376 fn on_initial_request(
378 &self,
379 initial_request: InitialRequest<Block>,
380 ) -> Result<InitialResponse<Block, TxHash<Pool>>, RelayError> {
381 let block_hash = self.block_hash(&initial_request.from_block)?;
382 let block_attributes = initial_request.block_attributes;
383
384 let partial_block = self.get_partial_block(&block_hash, block_attributes)?;
386 let ProtocolInitialRequest::CompactBlock(compact_request) =
387 initial_request.protocol_request;
388 let protocol_response = if block_attributes.contains(BlockAttributes::BODY) {
389 let compact_response = self.compact_block.build_initial_response(
390 &block_hash,
391 compact_request,
392 self.backend.as_ref(),
393 )?;
394 Some(ProtocolInitialResponse::from(compact_response))
395 } else {
396 None
397 };
398
399 Ok(InitialResponse {
400 block_hash,
401 partial_block,
402 protocol_response,
403 })
404 }
405
406 fn on_protocol_message(
408 &self,
409 msg: ProtocolMessage<Block, TxHash<Pool>>,
410 ) -> Result<Vec<u8>, RelayError> {
411 let response = match msg {
412 ProtocolMessage::CompactBlock(msg) => self
413 .compact_block
414 .on_protocol_message(msg, self.backend.as_ref())?
415 .encode(),
416 };
417 Ok(response)
418 }
419
420 fn on_full_download_request(
422 &self,
423 full_download_request: FullDownloadRequest<Block>,
424 ) -> Result<FullDownloadResponse<Block>, RelayError> {
425 let block_request = full_download_request.0;
426 let mut blocks = Vec::new();
427 let mut block_id = match block_request.from {
428 FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
429 FromBlock::Number(n) => BlockId::<Block>::Number(n),
430 };
431
432 let mut total_size = Compact::compact_len(&MAX_RESPONSE_BLOCKS.get());
434 let max_blocks = block_request.max.map_or(MAX_RESPONSE_BLOCKS.into(), |val| {
435 std::cmp::min(val, MAX_RESPONSE_BLOCKS.into())
436 });
437 while blocks.len() < max_blocks as usize {
438 let block_hash = self.block_hash(&block_id)?;
439 let partial_block = self.get_partial_block(&block_hash, block_request.fields)?;
440 let body = if block_request.fields.contains(BlockAttributes::BODY) {
441 Some(block_transactions(&block_hash, self.client.as_ref())?)
442 } else {
443 None
444 };
445 let block_number = partial_block.block_number;
446 let parent_hash = partial_block.parent_hash;
447 let block_data = partial_block.block_data(body);
448
449 let bytes = block_data.encoded_size();
451 if !blocks.is_empty() && (total_size + bytes) > MAX_RESPONSE_SIZE.into() {
452 break;
453 }
454 total_size += bytes;
455 blocks.push(block_data);
456
457 block_id = match block_request.direction {
458 Direction::Ascending => BlockId::Number(block_number + One::one()),
459 Direction::Descending => {
460 if block_number.is_zero() {
461 break;
462 }
463 BlockId::Hash(parent_hash)
464 }
465 };
466 }
467
468 Ok(FullDownloadResponse(blocks))
469 }
470
471 fn get_partial_block(
473 &self,
474 block_hash: &BlockHashFor<Block>,
475 block_attributes: BlockAttributes,
476 ) -> Result<PartialBlock<Block>, RelayError> {
477 let block_header = match self.client.header(*block_hash) {
478 Ok(Some(header)) => header,
479 Ok(None) => {
480 return Err(RelayError::BlockHeader(format!(
481 "Missing header: {block_hash:?}"
482 )));
483 }
484 Err(err) => return Err(RelayError::BlockHeader(format!("{block_hash:?}, {err:?}"))),
485 };
486 let parent_hash = *block_header.parent_hash();
487 let block_number = *block_header.number();
488 let block_header_hash = block_header.hash();
489
490 let header = if block_attributes.contains(BlockAttributes::HEADER) {
491 Some(block_header)
492 } else {
493 None
494 };
495
496 let indexed_body = if block_attributes.contains(BlockAttributes::INDEXED_BODY) {
497 self.client
498 .block_indexed_body(*block_hash)
499 .map_err(|err| RelayError::BlockIndexedBody(format!("{block_hash:?}, {err:?}")))?
500 } else {
501 None
502 };
503
504 let justifications = if block_attributes.contains(BlockAttributes::JUSTIFICATION) {
505 self.client.justifications(*block_hash).map_err(|err| {
506 RelayError::BlockJustifications(format!("{block_hash:?}, {err:?}"))
507 })?
508 } else {
509 None
510 };
511
512 Ok(PartialBlock {
513 parent_hash,
514 block_number,
515 block_header_hash,
516 header,
517 indexed_body,
518 justifications,
519 })
520 }
521
522 fn block_hash(&self, block_id: &BlockId<Block>) -> Result<BlockHashFor<Block>, RelayError> {
524 match self.client.block_hash_from_id(block_id) {
525 Ok(Some(hash)) => Ok(hash),
526 Ok(None) => Err(RelayError::BlockHash(format!("Missing: {block_id:?}"))),
527 Err(err) => Err(RelayError::BlockHash(format!("{block_id:?}, {err:?}"))),
528 }
529 }
530
531 fn send_response(
533 &self,
534 peer: PeerId,
535 response: Vec<u8>,
536 sender: oneshot::Sender<OutgoingResponse>,
537 ) {
538 let response = OutgoingResponse {
539 result: Ok(response),
540 reputation_changes: Vec::new(),
541 sent_feedback: None,
542 };
543 if sender.send(response).is_err() {
544 warn!(?peer, "Failed to send response");
545 }
546 }
547}
548
549#[async_trait]
550impl<Block, Client, Pool> BlockServer<Block> for ConsensusRelayServer<Block, Client, Pool>
551where
552 Block: BlockT,
553 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
554 Client::Api: SubspaceApi<Block, PublicKey>,
555 Pool: TransactionPool<Block = Block> + 'static,
556{
557 async fn run(&mut self) {
558 info!("relay::consensus block server: starting");
559 while let Some(request) = self.request_receiver.next().await {
560 self.process_incoming_request(request).await;
561 }
562 }
563}
564
565struct ConsensusClientBackend<Pool> {
567 transaction_pool: Arc<Pool>,
568}
569
570impl<Block, Pool> ClientBackend<TxHash<Pool>, ExtrinsicFor<Block>> for ConsensusClientBackend<Pool>
571where
572 Block: BlockT,
573 Pool: TransactionPool<Block = Block> + 'static,
574{
575 fn protocol_unit(&self, tx_hash: &TxHash<Pool>) -> Option<ExtrinsicFor<Block>> {
576 self.transaction_pool
578 .ready_transaction(tx_hash)
579 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
580 }
581}
582
583struct ConsensusServerBackend<Client, Pool> {
585 client: Arc<Client>,
586 transaction_pool: Arc<Pool>,
587}
588
589impl<Block, Client, Pool> ServerBackend<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>
590 for ConsensusServerBackend<Client, Pool>
591where
592 Block: BlockT,
593 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
594 Client::Api: SubspaceApi<Block, PublicKey>,
595 Pool: TransactionPool<Block = Block> + 'static,
596{
597 fn download_unit_members(
598 &self,
599 block_hash: &BlockHashFor<Block>,
600 ) -> Result<Vec<ProtocolUnitInfo<TxHash<Pool>, ExtrinsicFor<Block>>>, RelayError> {
601 let txns = block_transactions(block_hash, self.client.as_ref())?;
602 Ok(txns
603 .into_iter()
604 .map(|extrinsic| {
605 let send_tx = extrinsic.encoded_size() <= TX_SIZE_THRESHOLD.get()
606 || self
607 .client
608 .runtime_api()
609 .is_inherent(*block_hash, &extrinsic)
610 .unwrap_or(false);
611 ProtocolUnitInfo {
612 id: self.transaction_pool.hash_of(&extrinsic),
613 unit: if send_tx { Some(extrinsic) } else { None },
614 }
615 })
616 .collect())
617 }
618
619 fn protocol_unit(
620 &self,
621 block_hash: &BlockHashFor<Block>,
622 tx_hash: &TxHash<Pool>,
623 ) -> Option<ExtrinsicFor<Block>> {
624 match block_transactions(block_hash, self.client.as_ref()) {
626 Ok(extrinsics) => {
627 for extrinsic in extrinsics {
628 if self.transaction_pool.hash_of(&extrinsic) == *tx_hash {
629 return Some(extrinsic);
630 }
631 }
632 }
633 Err(err) => {
634 debug!(
635 ?block_hash,
636 ?tx_hash,
637 ?err,
638 "consensus server protocol_unit: "
639 );
640 }
641 }
642
643 self.transaction_pool
645 .ready_transaction(tx_hash)
646 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
647 }
648}
649
650fn block_transactions<Block, Client>(
652 block_hash: &BlockHashFor<Block>,
653 client: &Client,
654) -> Result<Vec<ExtrinsicFor<Block>>, RelayError>
655where
656 Block: BlockT,
657 Client: HeaderBackend<Block> + BlockBackend<Block>,
658{
659 let maybe_extrinsics = client
660 .block_body(*block_hash)
661 .map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
662 maybe_extrinsics.ok_or(RelayError::BlockExtrinsicsNotFound(format!(
663 "{block_hash:?}"
664 )))
665}
666
667#[derive(Debug, thiserror::Error)]
668pub enum BlockRelayConfigurationError {
669 #[error(transparent)]
670 PrometheusError(#[from] PrometheusError),
671}
672
673pub fn build_consensus_relay<Block, Client, Pool>(
675 network: Arc<NetworkWrapper>,
676 client: Arc<Client>,
677 pool: Arc<Pool>,
678 registry: Option<&Registry>,
679) -> Result<
680 BlockRelayParams<Block, NetworkWorker<Block, BlockHashFor<Block>>>,
681 BlockRelayConfigurationError,
682>
683where
684 Block: BlockT,
685 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block> + 'static,
686 Client::Api: SubspaceApi<Block, PublicKey>,
687 Pool: TransactionPool<Block = Block> + 'static,
688{
689 let (tx, request_receiver) = async_channel::bounded(NUM_PEER_HINT.get());
690
691 let backend = Arc::new(ConsensusClientBackend {
692 transaction_pool: pool.clone(),
693 });
694 let metrics = ConsensusClientMetrics::new(registry)
695 .map_err(BlockRelayConfigurationError::PrometheusError)?;
696 let relay_client: ConsensusRelayClient<Block, Pool> = ConsensusRelayClient::new(
697 network,
698 SYNC_PROTOCOL.into(),
699 CompactBlockClient::new(),
700 backend,
701 metrics,
702 );
703
704 let backend = Arc::new(ConsensusServerBackend {
705 client: client.clone(),
706 transaction_pool: pool.clone(),
707 });
708 let metrics = ConsensusServerMetrics::new(registry)
709 .map_err(BlockRelayConfigurationError::PrometheusError)?;
710 let relay_server = ConsensusRelayServer::new(
711 client,
712 CompactBlockServer::new(),
713 request_receiver,
714 backend,
715 metrics,
716 );
717
718 let mut protocol_config = ProtocolConfig {
719 name: SYNC_PROTOCOL.into(),
720 fallback_names: Vec::new(),
721 max_request_size: 1024 * 1024,
722 max_response_size: 16 * 1024 * 1024,
723 request_timeout: Duration::from_secs(20),
724 inbound_queue: None,
725 };
726 protocol_config.inbound_queue = Some(tx);
727
728 Ok(BlockRelayParams {
729 server: Box::new(relay_server),
730 downloader: Arc::new(relay_client),
731 request_response_config: protocol_config,
732 })
733}