sc_subspace_block_relay/consensus/
relay.rs

1//! Consensus block relay implementation.
2
3use crate::consensus::types::{
4    ConsensusClientMetrics, ConsensusRequest, ConsensusServerMetrics, FullDownloadRequest,
5    FullDownloadResponse, InitialRequest, InitialResponse, PartialBlock, ProtocolInitialRequest,
6    ProtocolInitialResponse, ProtocolMessage,
7};
8use crate::protocol::compact_block::{
9    CompactBlockClient, CompactBlockHandshake, CompactBlockServer,
10};
11use crate::protocol::{ClientBackend, ProtocolUnitInfo, ServerBackend};
12use crate::types::{RelayError, RequestResponseErr};
13use crate::utils::{NetworkPeerHandle, NetworkWrapper};
14use async_trait::async_trait;
15use futures::channel::oneshot;
16use futures::stream::StreamExt;
17use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
18use sc_client_api::{BlockBackend, HeaderBackend};
19use sc_network::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
20use sc_network::types::ProtocolName;
21use sc_network::{NetworkWorker, OutboundFailure, PeerId, RequestFailure};
22use sc_network_common::sync::message::{
23    BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
24};
25use sc_network_sync::block_relay_protocol::{
26    BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer,
27};
28use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
29use sp_api::ProvideRuntimeApi;
30use sp_consensus_subspace::SubspaceApi;
31use sp_runtime::generic::BlockId;
32use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
33use std::fmt;
34use std::num::{NonZeroU32, NonZeroUsize};
35use std::sync::Arc;
36use std::time::{Duration, Instant};
37use subspace_core_primitives::PublicKey;
38use subspace_runtime_primitives::{BlockHashFor, ExtrinsicFor};
39use substrate_prometheus_endpoint::{PrometheusError, Registry};
40use tracing::{debug, info, trace, warn};
41
42const SYNC_PROTOCOL: &str = "/subspace/consensus-block-relay/1";
43
44// TODO: size these properly, or move to config
45const NUM_PEER_HINT: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
46
47/// These are the same limits used by substrate block handler.
48/// Maximum response size (bytes).
49const MAX_RESPONSE_SIZE: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("Not zero; qed");
50/// Maximum blocks in the response.
51const MAX_RESPONSE_BLOCKS: NonZeroU32 = NonZeroU32::new(128).expect("Not zero; qed");
52
53/// If the encoded size of the extrinsic is less than the threshold,
54/// return the full extrinsic along with the tx hash.
55const TX_SIZE_THRESHOLD: NonZeroUsize = NonZeroUsize::new(32).expect("Not zero; qed");
56
57/// The client side of the consensus block relay
58struct ConsensusRelayClient<Block, Pool>
59where
60    Block: BlockT,
61    Pool: TransactionPool,
62{
63    network: Arc<NetworkWrapper>,
64    protocol_name: ProtocolName,
65    compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
66    backend: Arc<ConsensusClientBackend<Pool>>,
67    metrics: ConsensusClientMetrics,
68    _phantom_data: std::marker::PhantomData<(Block, Pool)>,
69}
70
71impl<Block, Pool> fmt::Debug for ConsensusRelayClient<Block, Pool>
72where
73    Block: BlockT,
74    Pool: TransactionPool,
75{
76    #[inline]
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("ConsensusRelayClient")
79            .field("protocol_name", &self.protocol_name)
80            .finish_non_exhaustive()
81    }
82}
83
84impl<Block, Pool> ConsensusRelayClient<Block, Pool>
85where
86    Block: BlockT,
87    Pool: TransactionPool<Block = Block> + 'static,
88{
89    /// Creates the consensus relay client.
90    fn new(
91        network: Arc<NetworkWrapper>,
92        protocol_name: ProtocolName,
93        compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
94        backend: Arc<ConsensusClientBackend<Pool>>,
95        metrics: ConsensusClientMetrics,
96    ) -> Self {
97        Self {
98            network,
99            protocol_name,
100            compact_block,
101            backend,
102            metrics,
103            _phantom_data: Default::default(),
104        }
105    }
106
107    /// Downloads the requested block from the peer using the relay protocol.
108    async fn download(
109        &self,
110        who: PeerId,
111        request: BlockRequest<Block>,
112    ) -> Result<Vec<BlockData<Block>>, RelayError> {
113        let start_ts = Instant::now();
114        let network_peer_handle = self
115            .network
116            .network_peer_handle(self.protocol_name.clone(), who)?;
117
118        // Perform the initial request/response
119        let initial_request = InitialRequest {
120            from_block: match request.from {
121                FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
122                FromBlock::Number(n) => BlockId::<Block>::Number(n),
123            },
124            block_attributes: request.fields,
125            protocol_request: ProtocolInitialRequest::from(
126                self.compact_block
127                    .build_initial_request(self.backend.as_ref()),
128            ),
129        };
130        let initial_response = network_peer_handle
131            .request::<_, InitialResponse<Block, TxHash<Pool>>>(ConsensusRequest::<
132                Block,
133                TxHash<Pool>,
134            >::from(initial_request))
135            .await?;
136
137        // Resolve the protocol response to get the extrinsics
138        let (body, local_miss) = if let Some(protocol_response) = initial_response.protocol_response
139        {
140            let (body, local_miss) = self
141                .resolve_extrinsics::<ConsensusRequest<Block, TxHash<Pool>>>(
142                    protocol_response,
143                    &network_peer_handle,
144                )
145                .await?;
146            (Some(body), local_miss)
147        } else {
148            (None, 0)
149        };
150
151        // Assemble the final response
152        let downloaded = vec![initial_response.partial_block.block_data(body)];
153        debug!(
154            block_hash = ?initial_response.block_hash,
155            download_bytes = %downloaded.encoded_size(),
156            %local_miss,
157            duration = ?start_ts.elapsed(),
158            "block_download",
159        );
160        Ok(downloaded)
161    }
162
163    /// Downloads the requested blocks from the peer, without using the relay protocol.
164    async fn full_download(
165        &self,
166        who: PeerId,
167        request: BlockRequest<Block>,
168    ) -> Result<Vec<BlockData<Block>>, RelayError> {
169        let start_ts = Instant::now();
170        let network_peer_handle = self
171            .network
172            .network_peer_handle(self.protocol_name.clone(), who)?;
173
174        let server_request =
175            ConsensusRequest::<Block, TxHash<Pool>>::from(FullDownloadRequest(request.clone()));
176        let full_response = network_peer_handle
177            .request::<_, FullDownloadResponse<Block>>(server_request)
178            .await?;
179        let downloaded = full_response.0;
180
181        debug!(
182            ?request,
183            download_blocks =  %downloaded.len(),
184            download_bytes = %downloaded.encoded_size(),
185            duration = ?start_ts.elapsed(),
186            "full_download",
187        );
188        Ok(downloaded)
189    }
190
191    /// Resolves the extrinsics from the initial response
192    async fn resolve_extrinsics<Request>(
193        &self,
194        protocol_response: ProtocolInitialResponse<Block, TxHash<Pool>>,
195        network_peer_handle: &NetworkPeerHandle,
196    ) -> Result<(Vec<ExtrinsicFor<Block>>, usize), RelayError>
197    where
198        Request:
199            From<CompactBlockHandshake<BlockHashFor<Block>, TxHash<Pool>>> + Encode + Send + Sync,
200    {
201        let ProtocolInitialResponse::CompactBlock(compact_response) = protocol_response;
202        let (block_hash, resolved) = self
203            .compact_block
204            .resolve_initial_response::<Request>(
205                compact_response,
206                network_peer_handle,
207                self.backend.as_ref(),
208            )
209            .await?;
210        let mut local_miss = 0;
211        let extrinsics = resolved
212            .into_iter()
213            .map(|entry| {
214                let encoded = entry.protocol_unit.encode();
215                if !entry.locally_resolved {
216                    trace!(
217                        ?block_hash,
218                        tx_hash = ?entry.protocol_unit_id,
219                        tx_size = %encoded.len(),
220                        "resolve_extrinsics: local miss"
221                    );
222                    self.metrics.tx_pool_miss.inc();
223                    local_miss += encoded.len();
224                }
225                entry.protocol_unit
226            })
227            .collect();
228        Ok((extrinsics, local_miss))
229    }
230}
231
232#[async_trait]
233impl<Block, Pool> BlockDownloader<Block> for ConsensusRelayClient<Block, Pool>
234where
235    Block: BlockT,
236    Pool: TransactionPool<Block = Block> + 'static,
237{
238    fn protocol_name(&self) -> &ProtocolName {
239        &self.protocol_name
240    }
241
242    async fn download_blocks(
243        &self,
244        who: PeerId,
245        request: BlockRequest<Block>,
246    ) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
247        let full_download = request.max.is_some_and(|max_blocks| max_blocks > 1);
248        let ret = if full_download {
249            self.full_download(who, request.clone()).await
250        } else {
251            self.download(who, request.clone()).await
252        };
253        match ret {
254            Ok(blocks) => {
255                self.metrics.on_download::<Block>(&blocks);
256                Ok(Ok((blocks.encode(), self.protocol_name.clone())))
257            }
258            Err(error) => {
259                debug!(
260                    peer=?who,
261                    ?request,
262                    ?error,
263                    "download_block failed"
264                );
265                self.metrics.on_download_fail(&error);
266                match error {
267                    RelayError::RequestResponse(error) => match error {
268                        RequestResponseErr::DecodeFailed { .. } => {
269                            Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
270                        }
271                        RequestResponseErr::RequestFailure(err) => Ok(Err(err)),
272                        RequestResponseErr::NetworkUninitialized => {
273                            // TODO: This is the best error found that kind of matches
274                            Ok(Err(RequestFailure::NotConnected))
275                        }
276                        RequestResponseErr::Canceled => Err(oneshot::Canceled),
277                    },
278                    _ => {
279                        // Why timeout???
280                        Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
281                    }
282                }
283            }
284        }
285    }
286
287    fn block_response_into_blocks(
288        &self,
289        _request: &BlockRequest<Block>,
290        response: Vec<u8>,
291    ) -> Result<Vec<BlockData<Block>>, BlockResponseError> {
292        Decode::decode(&mut response.as_ref()).map_err(|err| {
293            BlockResponseError::DecodeFailed(format!(
294                "Failed to decode consensus response: {err:?}"
295            ))
296        })
297    }
298}
299
300/// The server side of the consensus block relay
301struct ConsensusRelayServer<Block: BlockT, Client, Pool: TransactionPool> {
302    client: Arc<Client>,
303    compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
304    request_receiver: async_channel::Receiver<IncomingRequest>,
305    backend: Arc<ConsensusServerBackend<Client, Pool>>,
306    metrics: ConsensusServerMetrics,
307    _block: std::marker::PhantomData<Block>,
308}
309
310impl<Block, Client, Pool> ConsensusRelayServer<Block, Client, Pool>
311where
312    Block: BlockT,
313    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
314    Client::Api: SubspaceApi<Block, PublicKey>,
315    Pool: TransactionPool<Block = Block> + 'static,
316{
317    /// Creates the consensus relay server.
318    fn new(
319        client: Arc<Client>,
320        compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
321        request_receiver: async_channel::Receiver<IncomingRequest>,
322        backend: Arc<ConsensusServerBackend<Client, Pool>>,
323        metrics: ConsensusServerMetrics,
324    ) -> Self {
325        Self {
326            client,
327            compact_block,
328            request_receiver,
329            backend,
330            metrics,
331            _block: Default::default(),
332        }
333    }
334
335    /// Handles the received request from the client side
336    async fn process_incoming_request(&self, request: IncomingRequest) {
337        // Drop the request in case of errors and let the client time out.
338        // This is the behavior of the current substrate block handler.
339        let IncomingRequest {
340            peer,
341            payload,
342            pending_response,
343        } = request;
344        let req: ConsensusRequest<Block, TxHash<Pool>> = match Decode::decode(&mut payload.as_ref())
345        {
346            Ok(msg) => msg,
347            Err(err) => {
348                warn!(?peer, ?err, "Decode failed");
349                return;
350            }
351        };
352
353        let ret = match req {
354            ConsensusRequest::BlockDownloadV0(req) => {
355                self.on_initial_request(req).map(|rsp| rsp.encode())
356            }
357            ConsensusRequest::ProtocolMessageV0(msg) => self.on_protocol_message(msg),
358            ConsensusRequest::FullBlockDownloadV0(req) => {
359                self.on_full_download_request(req).map(|rsp| rsp.encode())
360            }
361        };
362
363        match ret {
364            Ok(response) => {
365                self.metrics.on_request();
366                self.send_response(peer, response, pending_response);
367                trace!(?peer, "server: request processed from");
368            }
369            Err(error) => {
370                self.metrics.on_failed_request(&error);
371                debug!(?peer, ?error, "Server error");
372            }
373        }
374    }
375
376    /// Handles the initial request from the client
377    fn on_initial_request(
378        &self,
379        initial_request: InitialRequest<Block>,
380    ) -> Result<InitialResponse<Block, TxHash<Pool>>, RelayError> {
381        let block_hash = self.block_hash(&initial_request.from_block)?;
382        let block_attributes = initial_request.block_attributes;
383
384        // Build the generic and the protocol specific parts of the response
385        let partial_block = self.get_partial_block(&block_hash, block_attributes)?;
386        let ProtocolInitialRequest::CompactBlock(compact_request) =
387            initial_request.protocol_request;
388        let protocol_response = if block_attributes.contains(BlockAttributes::BODY) {
389            let compact_response = self.compact_block.build_initial_response(
390                &block_hash,
391                compact_request,
392                self.backend.as_ref(),
393            )?;
394            Some(ProtocolInitialResponse::from(compact_response))
395        } else {
396            None
397        };
398
399        Ok(InitialResponse {
400            block_hash,
401            partial_block,
402            protocol_response,
403        })
404    }
405
406    /// Handles the protocol message from the client
407    fn on_protocol_message(
408        &self,
409        msg: ProtocolMessage<Block, TxHash<Pool>>,
410    ) -> Result<Vec<u8>, RelayError> {
411        let response = match msg {
412            ProtocolMessage::CompactBlock(msg) => self
413                .compact_block
414                .on_protocol_message(msg, self.backend.as_ref())?
415                .encode(),
416        };
417        Ok(response)
418    }
419
420    /// Handles the full download request from the client
421    fn on_full_download_request(
422        &self,
423        full_download_request: FullDownloadRequest<Block>,
424    ) -> Result<FullDownloadResponse<Block>, RelayError> {
425        let block_request = full_download_request.0;
426        let mut blocks = Vec::new();
427        let mut block_id = match block_request.from {
428            FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
429            FromBlock::Number(n) => BlockId::<Block>::Number(n),
430        };
431
432        // This is a compact length encoding of max number of blocks to be sent in response
433        let mut total_size = Compact::compact_len(&MAX_RESPONSE_BLOCKS.get());
434        let max_blocks = block_request.max.map_or(MAX_RESPONSE_BLOCKS.into(), |val| {
435            std::cmp::min(val, MAX_RESPONSE_BLOCKS.into())
436        });
437        while blocks.len() < max_blocks as usize {
438            let block_hash = self.block_hash(&block_id)?;
439            let partial_block = self.get_partial_block(&block_hash, block_request.fields)?;
440            let body = if block_request.fields.contains(BlockAttributes::BODY) {
441                Some(block_transactions(&block_hash, self.client.as_ref())?)
442            } else {
443                None
444            };
445            let block_number = partial_block.block_number;
446            let parent_hash = partial_block.parent_hash;
447            let block_data = partial_block.block_data(body);
448
449            // Enforce the max limit on response size.
450            let bytes = block_data.encoded_size();
451            if !blocks.is_empty() && (total_size + bytes) > MAX_RESPONSE_SIZE.into() {
452                break;
453            }
454            total_size += bytes;
455            blocks.push(block_data);
456
457            block_id = match block_request.direction {
458                Direction::Ascending => BlockId::Number(block_number + One::one()),
459                Direction::Descending => {
460                    if block_number.is_zero() {
461                        break;
462                    }
463                    BlockId::Hash(parent_hash)
464                }
465            };
466        }
467
468        Ok(FullDownloadResponse(blocks))
469    }
470
471    /// Builds the partial block response
472    fn get_partial_block(
473        &self,
474        block_hash: &BlockHashFor<Block>,
475        block_attributes: BlockAttributes,
476    ) -> Result<PartialBlock<Block>, RelayError> {
477        let block_header = match self.client.header(*block_hash) {
478            Ok(Some(header)) => header,
479            Ok(None) => {
480                return Err(RelayError::BlockHeader(format!(
481                    "Missing header: {block_hash:?}"
482                )));
483            }
484            Err(err) => return Err(RelayError::BlockHeader(format!("{block_hash:?}, {err:?}"))),
485        };
486        let parent_hash = *block_header.parent_hash();
487        let block_number = *block_header.number();
488        let block_header_hash = block_header.hash();
489
490        let header = if block_attributes.contains(BlockAttributes::HEADER) {
491            Some(block_header)
492        } else {
493            None
494        };
495
496        let indexed_body = if block_attributes.contains(BlockAttributes::INDEXED_BODY) {
497            self.client
498                .block_indexed_body(*block_hash)
499                .map_err(|err| RelayError::BlockIndexedBody(format!("{block_hash:?}, {err:?}")))?
500        } else {
501            None
502        };
503
504        let justifications = if block_attributes.contains(BlockAttributes::JUSTIFICATION) {
505            self.client.justifications(*block_hash).map_err(|err| {
506                RelayError::BlockJustifications(format!("{block_hash:?}, {err:?}"))
507            })?
508        } else {
509            None
510        };
511
512        Ok(PartialBlock {
513            parent_hash,
514            block_number,
515            block_header_hash,
516            header,
517            indexed_body,
518            justifications,
519        })
520    }
521
522    /// Converts the BlockId to block hash
523    fn block_hash(&self, block_id: &BlockId<Block>) -> Result<BlockHashFor<Block>, RelayError> {
524        match self.client.block_hash_from_id(block_id) {
525            Ok(Some(hash)) => Ok(hash),
526            Ok(None) => Err(RelayError::BlockHash(format!("Missing: {block_id:?}"))),
527            Err(err) => Err(RelayError::BlockHash(format!("{block_id:?}, {err:?}"))),
528        }
529    }
530
531    /// Builds/sends the response back to the client
532    fn send_response(
533        &self,
534        peer: PeerId,
535        response: Vec<u8>,
536        sender: oneshot::Sender<OutgoingResponse>,
537    ) {
538        let response = OutgoingResponse {
539            result: Ok(response),
540            reputation_changes: Vec::new(),
541            sent_feedback: None,
542        };
543        if sender.send(response).is_err() {
544            warn!(?peer, "Failed to send response");
545        }
546    }
547}
548
549#[async_trait]
550impl<Block, Client, Pool> BlockServer<Block> for ConsensusRelayServer<Block, Client, Pool>
551where
552    Block: BlockT,
553    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
554    Client::Api: SubspaceApi<Block, PublicKey>,
555    Pool: TransactionPool<Block = Block> + 'static,
556{
557    async fn run(&mut self) {
558        info!("relay::consensus block server: starting");
559        while let Some(request) = self.request_receiver.next().await {
560            self.process_incoming_request(request).await;
561        }
562    }
563}
564
565/// The client backend.
566struct ConsensusClientBackend<Pool> {
567    transaction_pool: Arc<Pool>,
568}
569
570impl<Block, Pool> ClientBackend<TxHash<Pool>, ExtrinsicFor<Block>> for ConsensusClientBackend<Pool>
571where
572    Block: BlockT,
573    Pool: TransactionPool<Block = Block> + 'static,
574{
575    fn protocol_unit(&self, tx_hash: &TxHash<Pool>) -> Option<ExtrinsicFor<Block>> {
576        // Look up the transaction pool.
577        self.transaction_pool
578            .ready_transaction(tx_hash)
579            .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
580    }
581}
582
583/// The server backend.
584struct ConsensusServerBackend<Client, Pool> {
585    client: Arc<Client>,
586    transaction_pool: Arc<Pool>,
587}
588
589impl<Block, Client, Pool> ServerBackend<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>
590    for ConsensusServerBackend<Client, Pool>
591where
592    Block: BlockT,
593    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
594    Client::Api: SubspaceApi<Block, PublicKey>,
595    Pool: TransactionPool<Block = Block> + 'static,
596{
597    fn download_unit_members(
598        &self,
599        block_hash: &BlockHashFor<Block>,
600    ) -> Result<Vec<ProtocolUnitInfo<TxHash<Pool>, ExtrinsicFor<Block>>>, RelayError> {
601        let txns = block_transactions(block_hash, self.client.as_ref())?;
602        Ok(txns
603            .into_iter()
604            .map(|extrinsic| {
605                let send_tx = extrinsic.encoded_size() <= TX_SIZE_THRESHOLD.get()
606                    || self
607                        .client
608                        .runtime_api()
609                        .is_inherent(*block_hash, &extrinsic)
610                        .unwrap_or(false);
611                ProtocolUnitInfo {
612                    id: self.transaction_pool.hash_of(&extrinsic),
613                    unit: if send_tx { Some(extrinsic) } else { None },
614                }
615            })
616            .collect())
617    }
618
619    fn protocol_unit(
620        &self,
621        block_hash: &BlockHashFor<Block>,
622        tx_hash: &TxHash<Pool>,
623    ) -> Option<ExtrinsicFor<Block>> {
624        // Look up the block extrinsics.
625        match block_transactions(block_hash, self.client.as_ref()) {
626            Ok(extrinsics) => {
627                for extrinsic in extrinsics {
628                    if self.transaction_pool.hash_of(&extrinsic) == *tx_hash {
629                        return Some(extrinsic);
630                    }
631                }
632            }
633            Err(err) => {
634                debug!(
635                    ?block_hash,
636                    ?tx_hash,
637                    ?err,
638                    "consensus server protocol_unit: "
639                );
640            }
641        }
642
643        // Next look up the transaction pool.
644        self.transaction_pool
645            .ready_transaction(tx_hash)
646            .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
647    }
648}
649
650/// Retrieves the block transactions/tx hash from the backend.
651fn block_transactions<Block, Client>(
652    block_hash: &BlockHashFor<Block>,
653    client: &Client,
654) -> Result<Vec<ExtrinsicFor<Block>>, RelayError>
655where
656    Block: BlockT,
657    Client: HeaderBackend<Block> + BlockBackend<Block>,
658{
659    let maybe_extrinsics = client
660        .block_body(*block_hash)
661        .map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
662    maybe_extrinsics.ok_or(RelayError::BlockExtrinsicsNotFound(format!(
663        "{block_hash:?}"
664    )))
665}
666
667#[derive(Debug, thiserror::Error)]
668pub enum BlockRelayConfigurationError {
669    #[error(transparent)]
670    PrometheusError(#[from] PrometheusError),
671}
672
673/// Sets up the relay components.
674pub fn build_consensus_relay<Block, Client, Pool>(
675    network: Arc<NetworkWrapper>,
676    client: Arc<Client>,
677    pool: Arc<Pool>,
678    registry: Option<&Registry>,
679) -> Result<
680    BlockRelayParams<Block, NetworkWorker<Block, BlockHashFor<Block>>>,
681    BlockRelayConfigurationError,
682>
683where
684    Block: BlockT,
685    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block> + 'static,
686    Client::Api: SubspaceApi<Block, PublicKey>,
687    Pool: TransactionPool<Block = Block> + 'static,
688{
689    let (tx, request_receiver) = async_channel::bounded(NUM_PEER_HINT.get());
690
691    let backend = Arc::new(ConsensusClientBackend {
692        transaction_pool: pool.clone(),
693    });
694    let metrics = ConsensusClientMetrics::new(registry)
695        .map_err(BlockRelayConfigurationError::PrometheusError)?;
696    let relay_client: ConsensusRelayClient<Block, Pool> = ConsensusRelayClient::new(
697        network,
698        SYNC_PROTOCOL.into(),
699        CompactBlockClient::new(),
700        backend,
701        metrics,
702    );
703
704    let backend = Arc::new(ConsensusServerBackend {
705        client: client.clone(),
706        transaction_pool: pool.clone(),
707    });
708    let metrics = ConsensusServerMetrics::new(registry)
709        .map_err(BlockRelayConfigurationError::PrometheusError)?;
710    let relay_server = ConsensusRelayServer::new(
711        client,
712        CompactBlockServer::new(),
713        request_receiver,
714        backend,
715        metrics,
716    );
717
718    let mut protocol_config = ProtocolConfig {
719        name: SYNC_PROTOCOL.into(),
720        fallback_names: Vec::new(),
721        max_request_size: 1024 * 1024,
722        max_response_size: 16 * 1024 * 1024,
723        request_timeout: Duration::from_secs(20),
724        inbound_queue: None,
725    };
726    protocol_config.inbound_queue = Some(tx);
727
728    Ok(BlockRelayParams {
729        server: Box::new(relay_server),
730        downloader: Arc::new(relay_client),
731        request_response_config: protocol_config,
732    })
733}