1use crate::consensus::types::{
4 ConsensusClientMetrics, ConsensusRequest, ConsensusServerMetrics, FullDownloadRequest,
5 FullDownloadResponse, InitialRequest, InitialResponse, PartialBlock, ProtocolInitialRequest,
6 ProtocolInitialResponse, ProtocolMessage,
7};
8use crate::protocol::compact_block::{
9 CompactBlockClient, CompactBlockHandshake, CompactBlockServer,
10};
11use crate::protocol::{ClientBackend, ProtocolUnitInfo, ServerBackend};
12use crate::types::{RelayError, RequestResponseErr};
13use crate::utils::{NetworkPeerHandle, NetworkWrapper};
14use crate::LOG_TARGET;
15use async_trait::async_trait;
16use futures::channel::oneshot;
17use futures::stream::StreamExt;
18use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
19use sc_client_api::{BlockBackend, HeaderBackend};
20use sc_network::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
21use sc_network::types::ProtocolName;
22use sc_network::{NetworkWorker, OutboundFailure, PeerId, RequestFailure};
23use sc_network_common::sync::message::{
24 BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
25};
26use sc_network_sync::block_relay_protocol::{
27 BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer,
28};
29use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
30use sp_api::ProvideRuntimeApi;
31use sp_consensus_subspace::SubspaceApi;
32use sp_runtime::generic::BlockId;
33use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
34use std::fmt;
35use std::num::{NonZeroU32, NonZeroUsize};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use subspace_core_primitives::PublicKey;
39use subspace_runtime_primitives::{BlockHashFor, ExtrinsicFor};
40use substrate_prometheus_endpoint::{PrometheusError, Registry};
41use tracing::{debug, info, trace, warn};
42
43const SYNC_PROTOCOL: &str = "/subspace/consensus-block-relay/1";
44
45const NUM_PEER_HINT: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
47
48const MAX_RESPONSE_SIZE: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("Not zero; qed");
51const MAX_RESPONSE_BLOCKS: NonZeroU32 = NonZeroU32::new(128).expect("Not zero; qed");
53
54const TX_SIZE_THRESHOLD: NonZeroUsize = NonZeroUsize::new(32).expect("Not zero; qed");
57
58struct ConsensusRelayClient<Block, Pool>
60where
61 Block: BlockT,
62 Pool: TransactionPool,
63{
64 network: Arc<NetworkWrapper>,
65 protocol_name: ProtocolName,
66 compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
67 backend: Arc<ConsensusClientBackend<Pool>>,
68 metrics: ConsensusClientMetrics,
69 _phantom_data: std::marker::PhantomData<(Block, Pool)>,
70}
71
72impl<Block, Pool> fmt::Debug for ConsensusRelayClient<Block, Pool>
73where
74 Block: BlockT,
75 Pool: TransactionPool,
76{
77 #[inline]
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 f.debug_struct("ConsensusRelayClient")
80 .field("protocol_name", &self.protocol_name)
81 .finish_non_exhaustive()
82 }
83}
84
85impl<Block, Pool> ConsensusRelayClient<Block, Pool>
86where
87 Block: BlockT,
88 Pool: TransactionPool<Block = Block> + 'static,
89{
90 fn new(
92 network: Arc<NetworkWrapper>,
93 protocol_name: ProtocolName,
94 compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
95 backend: Arc<ConsensusClientBackend<Pool>>,
96 metrics: ConsensusClientMetrics,
97 ) -> Self {
98 Self {
99 network,
100 protocol_name,
101 compact_block,
102 backend,
103 metrics,
104 _phantom_data: Default::default(),
105 }
106 }
107
108 async fn download(
110 &self,
111 who: PeerId,
112 request: BlockRequest<Block>,
113 ) -> Result<Vec<BlockData<Block>>, RelayError> {
114 let start_ts = Instant::now();
115 let network_peer_handle = self
116 .network
117 .network_peer_handle(self.protocol_name.clone(), who)?;
118
119 let initial_request = InitialRequest {
121 from_block: match request.from {
122 FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
123 FromBlock::Number(n) => BlockId::<Block>::Number(n),
124 },
125 block_attributes: request.fields,
126 protocol_request: ProtocolInitialRequest::from(
127 self.compact_block
128 .build_initial_request(self.backend.as_ref()),
129 ),
130 };
131 let initial_response = network_peer_handle
132 .request::<_, InitialResponse<Block, TxHash<Pool>>>(ConsensusRequest::<
133 Block,
134 TxHash<Pool>,
135 >::from(initial_request))
136 .await?;
137
138 let (body, local_miss) = if let Some(protocol_response) = initial_response.protocol_response
140 {
141 let (body, local_miss) = self
142 .resolve_extrinsics::<ConsensusRequest<Block, TxHash<Pool>>>(
143 protocol_response,
144 &network_peer_handle,
145 )
146 .await?;
147 (Some(body), local_miss)
148 } else {
149 (None, 0)
150 };
151
152 let downloaded = vec![initial_response.partial_block.block_data(body)];
154 debug!(
155 target: LOG_TARGET,
156 block_hash = ?initial_response.block_hash,
157 download_bytes = %downloaded.encoded_size(),
158 %local_miss,
159 duration = ?start_ts.elapsed(),
160 "block_download",
161 );
162 Ok(downloaded)
163 }
164
165 async fn full_download(
167 &self,
168 who: PeerId,
169 request: BlockRequest<Block>,
170 ) -> Result<Vec<BlockData<Block>>, RelayError> {
171 let start_ts = Instant::now();
172 let network_peer_handle = self
173 .network
174 .network_peer_handle(self.protocol_name.clone(), who)?;
175
176 let server_request =
177 ConsensusRequest::<Block, TxHash<Pool>>::from(FullDownloadRequest(request.clone()));
178 let full_response = network_peer_handle
179 .request::<_, FullDownloadResponse<Block>>(server_request)
180 .await?;
181 let downloaded = full_response.0;
182
183 debug!(
184 target: LOG_TARGET,
185 ?request,
186 download_blocks = %downloaded.len(),
187 download_bytes = %downloaded.encoded_size(),
188 duration = ?start_ts.elapsed(),
189 "full_download",
190 );
191 Ok(downloaded)
192 }
193
194 async fn resolve_extrinsics<Request>(
196 &self,
197 protocol_response: ProtocolInitialResponse<Block, TxHash<Pool>>,
198 network_peer_handle: &NetworkPeerHandle,
199 ) -> Result<(Vec<ExtrinsicFor<Block>>, usize), RelayError>
200 where
201 Request:
202 From<CompactBlockHandshake<BlockHashFor<Block>, TxHash<Pool>>> + Encode + Send + Sync,
203 {
204 let ProtocolInitialResponse::CompactBlock(compact_response) = protocol_response;
205 let (block_hash, resolved) = self
206 .compact_block
207 .resolve_initial_response::<Request>(
208 compact_response,
209 network_peer_handle,
210 self.backend.as_ref(),
211 )
212 .await?;
213 let mut local_miss = 0;
214 let extrinsics = resolved
215 .into_iter()
216 .map(|entry| {
217 let encoded = entry.protocol_unit.encode();
218 if !entry.locally_resolved {
219 trace!(
220 target: LOG_TARGET,
221 ?block_hash,
222 tx_hash = ?entry.protocol_unit_id,
223 tx_size = %encoded.len(),
224 "resolve_extrinsics: local miss"
225 );
226 self.metrics.tx_pool_miss.inc();
227 local_miss += encoded.len();
228 }
229 entry.protocol_unit
230 })
231 .collect();
232 Ok((extrinsics, local_miss))
233 }
234}
235
236#[async_trait]
237impl<Block, Pool> BlockDownloader<Block> for ConsensusRelayClient<Block, Pool>
238where
239 Block: BlockT,
240 Pool: TransactionPool<Block = Block> + 'static,
241{
242 fn protocol_name(&self) -> &ProtocolName {
243 &self.protocol_name
244 }
245
246 async fn download_blocks(
247 &self,
248 who: PeerId,
249 request: BlockRequest<Block>,
250 ) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
251 let full_download = request.max.is_some_and(|max_blocks| max_blocks > 1);
252 let ret = if full_download {
253 self.full_download(who, request.clone()).await
254 } else {
255 self.download(who, request.clone()).await
256 };
257 match ret {
258 Ok(blocks) => {
259 self.metrics.on_download::<Block>(&blocks);
260 Ok(Ok((blocks.encode(), self.protocol_name.clone())))
261 }
262 Err(error) => {
263 debug!(
264 target: LOG_TARGET,
265 peer=?who,
266 ?request,
267 ?error,
268 "download_block failed"
269 );
270 self.metrics.on_download_fail(&error);
271 match error {
272 RelayError::RequestResponse(error) => match error {
273 RequestResponseErr::DecodeFailed { .. } => {
274 Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
275 }
276 RequestResponseErr::RequestFailure(err) => Ok(Err(err)),
277 RequestResponseErr::NetworkUninitialized => {
278 Ok(Err(RequestFailure::NotConnected))
280 }
281 RequestResponseErr::Canceled => Err(oneshot::Canceled),
282 },
283 _ => {
284 Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
286 }
287 }
288 }
289 }
290 }
291
292 fn block_response_into_blocks(
293 &self,
294 _request: &BlockRequest<Block>,
295 response: Vec<u8>,
296 ) -> Result<Vec<BlockData<Block>>, BlockResponseError> {
297 Decode::decode(&mut response.as_ref()).map_err(|err| {
298 BlockResponseError::DecodeFailed(format!(
299 "Failed to decode consensus response: {err:?}"
300 ))
301 })
302 }
303}
304
305struct ConsensusRelayServer<Block: BlockT, Client, Pool: TransactionPool> {
307 client: Arc<Client>,
308 compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
309 request_receiver: async_channel::Receiver<IncomingRequest>,
310 backend: Arc<ConsensusServerBackend<Client, Pool>>,
311 metrics: ConsensusServerMetrics,
312 _block: std::marker::PhantomData<Block>,
313}
314
315impl<Block, Client, Pool> ConsensusRelayServer<Block, Client, Pool>
316where
317 Block: BlockT,
318 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
319 Client::Api: SubspaceApi<Block, PublicKey>,
320 Pool: TransactionPool<Block = Block> + 'static,
321{
322 fn new(
324 client: Arc<Client>,
325 compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
326 request_receiver: async_channel::Receiver<IncomingRequest>,
327 backend: Arc<ConsensusServerBackend<Client, Pool>>,
328 metrics: ConsensusServerMetrics,
329 ) -> Self {
330 Self {
331 client,
332 compact_block,
333 request_receiver,
334 backend,
335 metrics,
336 _block: Default::default(),
337 }
338 }
339
340 async fn process_incoming_request(&self, request: IncomingRequest) {
342 let IncomingRequest {
345 peer,
346 payload,
347 pending_response,
348 } = request;
349 let req: ConsensusRequest<Block, TxHash<Pool>> = match Decode::decode(&mut payload.as_ref())
350 {
351 Ok(msg) => msg,
352 Err(err) => {
353 warn!(
354 target: LOG_TARGET,
355 ?peer,
356 ?err,
357 "Decode failed"
358 );
359 return;
360 }
361 };
362
363 let ret = match req {
364 ConsensusRequest::BlockDownloadV0(req) => {
365 self.on_initial_request(req).map(|rsp| rsp.encode())
366 }
367 ConsensusRequest::ProtocolMessageV0(msg) => self.on_protocol_message(msg),
368 ConsensusRequest::FullBlockDownloadV0(req) => {
369 self.on_full_download_request(req).map(|rsp| rsp.encode())
370 }
371 };
372
373 match ret {
374 Ok(response) => {
375 self.metrics.on_request();
376 self.send_response(peer, response, pending_response);
377 trace!(
378 target: LOG_TARGET,
379 ?peer,
380 "server: request processed from"
381 );
382 }
383 Err(error) => {
384 self.metrics.on_failed_request(&error);
385 debug!(
386 target: LOG_TARGET,
387 ?peer,
388 ?error,
389 "Server error"
390 );
391 }
392 }
393 }
394
395 fn on_initial_request(
397 &self,
398 initial_request: InitialRequest<Block>,
399 ) -> Result<InitialResponse<Block, TxHash<Pool>>, RelayError> {
400 let block_hash = self.block_hash(&initial_request.from_block)?;
401 let block_attributes = initial_request.block_attributes;
402
403 let partial_block = self.get_partial_block(&block_hash, block_attributes)?;
405 let ProtocolInitialRequest::CompactBlock(compact_request) =
406 initial_request.protocol_request;
407 let protocol_response = if block_attributes.contains(BlockAttributes::BODY) {
408 let compact_response = self.compact_block.build_initial_response(
409 &block_hash,
410 compact_request,
411 self.backend.as_ref(),
412 )?;
413 Some(ProtocolInitialResponse::from(compact_response))
414 } else {
415 None
416 };
417
418 Ok(InitialResponse {
419 block_hash,
420 partial_block,
421 protocol_response,
422 })
423 }
424
425 fn on_protocol_message(
427 &self,
428 msg: ProtocolMessage<Block, TxHash<Pool>>,
429 ) -> Result<Vec<u8>, RelayError> {
430 let response = match msg {
431 ProtocolMessage::CompactBlock(msg) => self
432 .compact_block
433 .on_protocol_message(msg, self.backend.as_ref())?
434 .encode(),
435 };
436 Ok(response)
437 }
438
439 fn on_full_download_request(
441 &self,
442 full_download_request: FullDownloadRequest<Block>,
443 ) -> Result<FullDownloadResponse<Block>, RelayError> {
444 let block_request = full_download_request.0;
445 let mut blocks = Vec::new();
446 let mut block_id = match block_request.from {
447 FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
448 FromBlock::Number(n) => BlockId::<Block>::Number(n),
449 };
450
451 let mut total_size = Compact::compact_len(&MAX_RESPONSE_BLOCKS.get());
453 let max_blocks = block_request.max.map_or(MAX_RESPONSE_BLOCKS.into(), |val| {
454 std::cmp::min(val, MAX_RESPONSE_BLOCKS.into())
455 });
456 while blocks.len() < max_blocks as usize {
457 let block_hash = self.block_hash(&block_id)?;
458 let partial_block = self.get_partial_block(&block_hash, block_request.fields)?;
459 let body = if block_request.fields.contains(BlockAttributes::BODY) {
460 Some(block_transactions(&block_hash, self.client.as_ref())?)
461 } else {
462 None
463 };
464 let block_number = partial_block.block_number;
465 let parent_hash = partial_block.parent_hash;
466 let block_data = partial_block.block_data(body);
467
468 let bytes = block_data.encoded_size();
470 if !blocks.is_empty() && (total_size + bytes) > MAX_RESPONSE_SIZE.into() {
471 break;
472 }
473 total_size += bytes;
474 blocks.push(block_data);
475
476 block_id = match block_request.direction {
477 Direction::Ascending => BlockId::Number(block_number + One::one()),
478 Direction::Descending => {
479 if block_number.is_zero() {
480 break;
481 }
482 BlockId::Hash(parent_hash)
483 }
484 };
485 }
486
487 Ok(FullDownloadResponse(blocks))
488 }
489
490 fn get_partial_block(
492 &self,
493 block_hash: &BlockHashFor<Block>,
494 block_attributes: BlockAttributes,
495 ) -> Result<PartialBlock<Block>, RelayError> {
496 let block_header = match self.client.header(*block_hash) {
497 Ok(Some(header)) => header,
498 Ok(None) => {
499 return Err(RelayError::BlockHeader(format!(
500 "Missing header: {block_hash:?}"
501 )))
502 }
503 Err(err) => return Err(RelayError::BlockHeader(format!("{block_hash:?}, {err:?}"))),
504 };
505 let parent_hash = *block_header.parent_hash();
506 let block_number = *block_header.number();
507 let block_header_hash = block_header.hash();
508
509 let header = if block_attributes.contains(BlockAttributes::HEADER) {
510 Some(block_header)
511 } else {
512 None
513 };
514
515 let indexed_body = if block_attributes.contains(BlockAttributes::INDEXED_BODY) {
516 self.client
517 .block_indexed_body(*block_hash)
518 .map_err(|err| RelayError::BlockIndexedBody(format!("{block_hash:?}, {err:?}")))?
519 } else {
520 None
521 };
522
523 let justifications = if block_attributes.contains(BlockAttributes::JUSTIFICATION) {
524 self.client.justifications(*block_hash).map_err(|err| {
525 RelayError::BlockJustifications(format!("{block_hash:?}, {err:?}"))
526 })?
527 } else {
528 None
529 };
530
531 Ok(PartialBlock {
532 parent_hash,
533 block_number,
534 block_header_hash,
535 header,
536 indexed_body,
537 justifications,
538 })
539 }
540
541 fn block_hash(&self, block_id: &BlockId<Block>) -> Result<BlockHashFor<Block>, RelayError> {
543 match self.client.block_hash_from_id(block_id) {
544 Ok(Some(hash)) => Ok(hash),
545 Ok(None) => Err(RelayError::BlockHash(format!("Missing: {block_id:?}"))),
546 Err(err) => Err(RelayError::BlockHash(format!("{block_id:?}, {err:?}"))),
547 }
548 }
549
550 fn send_response(
552 &self,
553 peer: PeerId,
554 response: Vec<u8>,
555 sender: oneshot::Sender<OutgoingResponse>,
556 ) {
557 let response = OutgoingResponse {
558 result: Ok(response),
559 reputation_changes: Vec::new(),
560 sent_feedback: None,
561 };
562 if sender.send(response).is_err() {
563 warn!(
564 target: LOG_TARGET,
565 ?peer,
566 "Failed to send response"
567 );
568 }
569 }
570}
571
572#[async_trait]
573impl<Block, Client, Pool> BlockServer<Block> for ConsensusRelayServer<Block, Client, Pool>
574where
575 Block: BlockT,
576 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
577 Client::Api: SubspaceApi<Block, PublicKey>,
578 Pool: TransactionPool<Block = Block> + 'static,
579{
580 async fn run(&mut self) {
581 info!(
582 target: LOG_TARGET,
583 "relay::consensus block server: starting"
584 );
585 while let Some(request) = self.request_receiver.next().await {
586 self.process_incoming_request(request).await;
587 }
588 }
589}
590
591struct ConsensusClientBackend<Pool> {
593 transaction_pool: Arc<Pool>,
594}
595
596impl<Block, Pool> ClientBackend<TxHash<Pool>, ExtrinsicFor<Block>> for ConsensusClientBackend<Pool>
597where
598 Block: BlockT,
599 Pool: TransactionPool<Block = Block> + 'static,
600{
601 fn protocol_unit(&self, tx_hash: &TxHash<Pool>) -> Option<ExtrinsicFor<Block>> {
602 self.transaction_pool
604 .ready_transaction(tx_hash)
605 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
606 }
607}
608
609struct ConsensusServerBackend<Client, Pool> {
611 client: Arc<Client>,
612 transaction_pool: Arc<Pool>,
613}
614
615impl<Block, Client, Pool> ServerBackend<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>
616 for ConsensusServerBackend<Client, Pool>
617where
618 Block: BlockT,
619 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
620 Client::Api: SubspaceApi<Block, PublicKey>,
621 Pool: TransactionPool<Block = Block> + 'static,
622{
623 fn download_unit_members(
624 &self,
625 block_hash: &BlockHashFor<Block>,
626 ) -> Result<Vec<ProtocolUnitInfo<TxHash<Pool>, ExtrinsicFor<Block>>>, RelayError> {
627 let txns = block_transactions(block_hash, self.client.as_ref())?;
628 Ok(txns
629 .into_iter()
630 .map(|extrinsic| {
631 let send_tx = extrinsic.encoded_size() <= TX_SIZE_THRESHOLD.get()
632 || self
633 .client
634 .runtime_api()
635 .is_inherent(*block_hash, &extrinsic)
636 .unwrap_or(false);
637 ProtocolUnitInfo {
638 id: self.transaction_pool.hash_of(&extrinsic),
639 unit: if send_tx { Some(extrinsic) } else { None },
640 }
641 })
642 .collect())
643 }
644
645 fn protocol_unit(
646 &self,
647 block_hash: &BlockHashFor<Block>,
648 tx_hash: &TxHash<Pool>,
649 ) -> Option<ExtrinsicFor<Block>> {
650 match block_transactions(block_hash, self.client.as_ref()) {
652 Ok(extrinsics) => {
653 for extrinsic in extrinsics {
654 if self.transaction_pool.hash_of(&extrinsic) == *tx_hash {
655 return Some(extrinsic);
656 }
657 }
658 }
659 Err(err) => {
660 debug!(
661 target: LOG_TARGET,
662 ?block_hash,
663 ?tx_hash,
664 ?err,
665 "consensus server protocol_unit: "
666 );
667 }
668 }
669
670 self.transaction_pool
672 .ready_transaction(tx_hash)
673 .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
674 }
675}
676
677fn block_transactions<Block, Client>(
679 block_hash: &BlockHashFor<Block>,
680 client: &Client,
681) -> Result<Vec<ExtrinsicFor<Block>>, RelayError>
682where
683 Block: BlockT,
684 Client: HeaderBackend<Block> + BlockBackend<Block>,
685{
686 let maybe_extrinsics = client
687 .block_body(*block_hash)
688 .map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
689 maybe_extrinsics.ok_or(RelayError::BlockExtrinsicsNotFound(format!(
690 "{block_hash:?}"
691 )))
692}
693
694#[derive(Debug, thiserror::Error)]
695pub enum BlockRelayConfigurationError {
696 #[error(transparent)]
697 PrometheusError(#[from] PrometheusError),
698}
699
700pub fn build_consensus_relay<Block, Client, Pool>(
702 network: Arc<NetworkWrapper>,
703 client: Arc<Client>,
704 pool: Arc<Pool>,
705 registry: Option<&Registry>,
706) -> Result<
707 BlockRelayParams<Block, NetworkWorker<Block, BlockHashFor<Block>>>,
708 BlockRelayConfigurationError,
709>
710where
711 Block: BlockT,
712 Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block> + 'static,
713 Client::Api: SubspaceApi<Block, PublicKey>,
714 Pool: TransactionPool<Block = Block> + 'static,
715{
716 let (tx, request_receiver) = async_channel::bounded(NUM_PEER_HINT.get());
717
718 let backend = Arc::new(ConsensusClientBackend {
719 transaction_pool: pool.clone(),
720 });
721 let metrics = ConsensusClientMetrics::new(registry)
722 .map_err(BlockRelayConfigurationError::PrometheusError)?;
723 let relay_client: ConsensusRelayClient<Block, Pool> = ConsensusRelayClient::new(
724 network,
725 SYNC_PROTOCOL.into(),
726 CompactBlockClient::new(),
727 backend,
728 metrics,
729 );
730
731 let backend = Arc::new(ConsensusServerBackend {
732 client: client.clone(),
733 transaction_pool: pool.clone(),
734 });
735 let metrics = ConsensusServerMetrics::new(registry)
736 .map_err(BlockRelayConfigurationError::PrometheusError)?;
737 let relay_server = ConsensusRelayServer::new(
738 client,
739 CompactBlockServer::new(),
740 request_receiver,
741 backend,
742 metrics,
743 );
744
745 let mut protocol_config = ProtocolConfig {
746 name: SYNC_PROTOCOL.into(),
747 fallback_names: Vec::new(),
748 max_request_size: 1024 * 1024,
749 max_response_size: 16 * 1024 * 1024,
750 request_timeout: Duration::from_secs(20),
751 inbound_queue: None,
752 };
753 protocol_config.inbound_queue = Some(tx);
754
755 Ok(BlockRelayParams {
756 server: Box::new(relay_server),
757 downloader: Arc::new(relay_client),
758 request_response_config: protocol_config,
759 })
760}