sc_subspace_block_relay/consensus/
relay.rs

1//! Consensus block relay implementation.
2
3use crate::consensus::types::{
4    ConsensusClientMetrics, ConsensusRequest, ConsensusServerMetrics, FullDownloadRequest,
5    FullDownloadResponse, InitialRequest, InitialResponse, PartialBlock, ProtocolInitialRequest,
6    ProtocolInitialResponse, ProtocolMessage,
7};
8use crate::protocol::compact_block::{
9    CompactBlockClient, CompactBlockHandshake, CompactBlockServer,
10};
11use crate::protocol::{ClientBackend, ProtocolUnitInfo, ServerBackend};
12use crate::types::{RelayError, RequestResponseErr};
13use crate::utils::{NetworkPeerHandle, NetworkWrapper};
14use crate::LOG_TARGET;
15use async_trait::async_trait;
16use futures::channel::oneshot;
17use futures::stream::StreamExt;
18use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
19use sc_client_api::{BlockBackend, HeaderBackend};
20use sc_network::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
21use sc_network::types::ProtocolName;
22use sc_network::{NetworkWorker, OutboundFailure, PeerId, RequestFailure};
23use sc_network_common::sync::message::{
24    BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
25};
26use sc_network_sync::block_relay_protocol::{
27    BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer,
28};
29use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxHash};
30use sp_api::ProvideRuntimeApi;
31use sp_consensus_subspace::SubspaceApi;
32use sp_runtime::generic::BlockId;
33use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
34use std::fmt;
35use std::num::{NonZeroU32, NonZeroUsize};
36use std::sync::Arc;
37use std::time::{Duration, Instant};
38use subspace_core_primitives::PublicKey;
39use subspace_runtime_primitives::{BlockHashFor, ExtrinsicFor};
40use substrate_prometheus_endpoint::{PrometheusError, Registry};
41use tracing::{debug, info, trace, warn};
42
43const SYNC_PROTOCOL: &str = "/subspace/consensus-block-relay/1";
44
45// TODO: size these properly, or move to config
46const NUM_PEER_HINT: NonZeroUsize = NonZeroUsize::new(100).expect("Not zero; qed");
47
48/// These are the same limits used by substrate block handler.
49/// Maximum response size (bytes).
50const MAX_RESPONSE_SIZE: NonZeroUsize = NonZeroUsize::new(8 * 1024 * 1024).expect("Not zero; qed");
51/// Maximum blocks in the response.
52const MAX_RESPONSE_BLOCKS: NonZeroU32 = NonZeroU32::new(128).expect("Not zero; qed");
53
54/// If the encoded size of the extrinsic is less than the threshold,
55/// return the full extrinsic along with the tx hash.
56const TX_SIZE_THRESHOLD: NonZeroUsize = NonZeroUsize::new(32).expect("Not zero; qed");
57
58/// The client side of the consensus block relay
59struct ConsensusRelayClient<Block, Pool>
60where
61    Block: BlockT,
62    Pool: TransactionPool,
63{
64    network: Arc<NetworkWrapper>,
65    protocol_name: ProtocolName,
66    compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
67    backend: Arc<ConsensusClientBackend<Pool>>,
68    metrics: ConsensusClientMetrics,
69    _phantom_data: std::marker::PhantomData<(Block, Pool)>,
70}
71
72impl<Block, Pool> fmt::Debug for ConsensusRelayClient<Block, Pool>
73where
74    Block: BlockT,
75    Pool: TransactionPool,
76{
77    #[inline]
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        f.debug_struct("ConsensusRelayClient")
80            .field("protocol_name", &self.protocol_name)
81            .finish_non_exhaustive()
82    }
83}
84
85impl<Block, Pool> ConsensusRelayClient<Block, Pool>
86where
87    Block: BlockT,
88    Pool: TransactionPool<Block = Block> + 'static,
89{
90    /// Creates the consensus relay client.
91    fn new(
92        network: Arc<NetworkWrapper>,
93        protocol_name: ProtocolName,
94        compact_block: CompactBlockClient<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
95        backend: Arc<ConsensusClientBackend<Pool>>,
96        metrics: ConsensusClientMetrics,
97    ) -> Self {
98        Self {
99            network,
100            protocol_name,
101            compact_block,
102            backend,
103            metrics,
104            _phantom_data: Default::default(),
105        }
106    }
107
108    /// Downloads the requested block from the peer using the relay protocol.
109    async fn download(
110        &self,
111        who: PeerId,
112        request: BlockRequest<Block>,
113    ) -> Result<Vec<BlockData<Block>>, RelayError> {
114        let start_ts = Instant::now();
115        let network_peer_handle = self
116            .network
117            .network_peer_handle(self.protocol_name.clone(), who)?;
118
119        // Perform the initial request/response
120        let initial_request = InitialRequest {
121            from_block: match request.from {
122                FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
123                FromBlock::Number(n) => BlockId::<Block>::Number(n),
124            },
125            block_attributes: request.fields,
126            protocol_request: ProtocolInitialRequest::from(
127                self.compact_block
128                    .build_initial_request(self.backend.as_ref()),
129            ),
130        };
131        let initial_response = network_peer_handle
132            .request::<_, InitialResponse<Block, TxHash<Pool>>>(ConsensusRequest::<
133                Block,
134                TxHash<Pool>,
135            >::from(initial_request))
136            .await?;
137
138        // Resolve the protocol response to get the extrinsics
139        let (body, local_miss) = if let Some(protocol_response) = initial_response.protocol_response
140        {
141            let (body, local_miss) = self
142                .resolve_extrinsics::<ConsensusRequest<Block, TxHash<Pool>>>(
143                    protocol_response,
144                    &network_peer_handle,
145                )
146                .await?;
147            (Some(body), local_miss)
148        } else {
149            (None, 0)
150        };
151
152        // Assemble the final response
153        let downloaded = vec![initial_response.partial_block.block_data(body)];
154        debug!(
155            target: LOG_TARGET,
156            block_hash = ?initial_response.block_hash,
157            download_bytes = %downloaded.encoded_size(),
158            %local_miss,
159            duration = ?start_ts.elapsed(),
160            "block_download",
161        );
162        Ok(downloaded)
163    }
164
165    /// Downloads the requested blocks from the peer, without using the relay protocol.
166    async fn full_download(
167        &self,
168        who: PeerId,
169        request: BlockRequest<Block>,
170    ) -> Result<Vec<BlockData<Block>>, RelayError> {
171        let start_ts = Instant::now();
172        let network_peer_handle = self
173            .network
174            .network_peer_handle(self.protocol_name.clone(), who)?;
175
176        let server_request =
177            ConsensusRequest::<Block, TxHash<Pool>>::from(FullDownloadRequest(request.clone()));
178        let full_response = network_peer_handle
179            .request::<_, FullDownloadResponse<Block>>(server_request)
180            .await?;
181        let downloaded = full_response.0;
182
183        debug!(
184            target: LOG_TARGET,
185            ?request,
186            download_blocks =  %downloaded.len(),
187            download_bytes = %downloaded.encoded_size(),
188            duration = ?start_ts.elapsed(),
189            "full_download",
190        );
191        Ok(downloaded)
192    }
193
194    /// Resolves the extrinsics from the initial response
195    async fn resolve_extrinsics<Request>(
196        &self,
197        protocol_response: ProtocolInitialResponse<Block, TxHash<Pool>>,
198        network_peer_handle: &NetworkPeerHandle,
199    ) -> Result<(Vec<ExtrinsicFor<Block>>, usize), RelayError>
200    where
201        Request:
202            From<CompactBlockHandshake<BlockHashFor<Block>, TxHash<Pool>>> + Encode + Send + Sync,
203    {
204        let ProtocolInitialResponse::CompactBlock(compact_response) = protocol_response;
205        let (block_hash, resolved) = self
206            .compact_block
207            .resolve_initial_response::<Request>(
208                compact_response,
209                network_peer_handle,
210                self.backend.as_ref(),
211            )
212            .await?;
213        let mut local_miss = 0;
214        let extrinsics = resolved
215            .into_iter()
216            .map(|entry| {
217                let encoded = entry.protocol_unit.encode();
218                if !entry.locally_resolved {
219                    trace!(
220                        target: LOG_TARGET,
221                        ?block_hash,
222                        tx_hash = ?entry.protocol_unit_id,
223                        tx_size = %encoded.len(),
224                        "resolve_extrinsics: local miss"
225                    );
226                    self.metrics.tx_pool_miss.inc();
227                    local_miss += encoded.len();
228                }
229                entry.protocol_unit
230            })
231            .collect();
232        Ok((extrinsics, local_miss))
233    }
234}
235
236#[async_trait]
237impl<Block, Pool> BlockDownloader<Block> for ConsensusRelayClient<Block, Pool>
238where
239    Block: BlockT,
240    Pool: TransactionPool<Block = Block> + 'static,
241{
242    fn protocol_name(&self) -> &ProtocolName {
243        &self.protocol_name
244    }
245
246    async fn download_blocks(
247        &self,
248        who: PeerId,
249        request: BlockRequest<Block>,
250    ) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
251        let full_download = request.max.is_some_and(|max_blocks| max_blocks > 1);
252        let ret = if full_download {
253            self.full_download(who, request.clone()).await
254        } else {
255            self.download(who, request.clone()).await
256        };
257        match ret {
258            Ok(blocks) => {
259                self.metrics.on_download::<Block>(&blocks);
260                Ok(Ok((blocks.encode(), self.protocol_name.clone())))
261            }
262            Err(error) => {
263                debug!(
264                    target: LOG_TARGET,
265                    peer=?who,
266                    ?request,
267                    ?error,
268                    "download_block failed"
269                );
270                self.metrics.on_download_fail(&error);
271                match error {
272                    RelayError::RequestResponse(error) => match error {
273                        RequestResponseErr::DecodeFailed { .. } => {
274                            Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
275                        }
276                        RequestResponseErr::RequestFailure(err) => Ok(Err(err)),
277                        RequestResponseErr::NetworkUninitialized => {
278                            // TODO: This is the best error found that kind of matches
279                            Ok(Err(RequestFailure::NotConnected))
280                        }
281                        RequestResponseErr::Canceled => Err(oneshot::Canceled),
282                    },
283                    _ => {
284                        // Why timeout???
285                        Ok(Err(RequestFailure::Network(OutboundFailure::Timeout)))
286                    }
287                }
288            }
289        }
290    }
291
292    fn block_response_into_blocks(
293        &self,
294        _request: &BlockRequest<Block>,
295        response: Vec<u8>,
296    ) -> Result<Vec<BlockData<Block>>, BlockResponseError> {
297        Decode::decode(&mut response.as_ref()).map_err(|err| {
298            BlockResponseError::DecodeFailed(format!(
299                "Failed to decode consensus response: {err:?}"
300            ))
301        })
302    }
303}
304
305/// The server side of the consensus block relay
306struct ConsensusRelayServer<Block: BlockT, Client, Pool: TransactionPool> {
307    client: Arc<Client>,
308    compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
309    request_receiver: async_channel::Receiver<IncomingRequest>,
310    backend: Arc<ConsensusServerBackend<Client, Pool>>,
311    metrics: ConsensusServerMetrics,
312    _block: std::marker::PhantomData<Block>,
313}
314
315impl<Block, Client, Pool> ConsensusRelayServer<Block, Client, Pool>
316where
317    Block: BlockT,
318    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
319    Client::Api: SubspaceApi<Block, PublicKey>,
320    Pool: TransactionPool<Block = Block> + 'static,
321{
322    /// Creates the consensus relay server.
323    fn new(
324        client: Arc<Client>,
325        compact_block: CompactBlockServer<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>,
326        request_receiver: async_channel::Receiver<IncomingRequest>,
327        backend: Arc<ConsensusServerBackend<Client, Pool>>,
328        metrics: ConsensusServerMetrics,
329    ) -> Self {
330        Self {
331            client,
332            compact_block,
333            request_receiver,
334            backend,
335            metrics,
336            _block: Default::default(),
337        }
338    }
339
340    /// Handles the received request from the client side
341    async fn process_incoming_request(&self, request: IncomingRequest) {
342        // Drop the request in case of errors and let the client time out.
343        // This is the behavior of the current substrate block handler.
344        let IncomingRequest {
345            peer,
346            payload,
347            pending_response,
348        } = request;
349        let req: ConsensusRequest<Block, TxHash<Pool>> = match Decode::decode(&mut payload.as_ref())
350        {
351            Ok(msg) => msg,
352            Err(err) => {
353                warn!(
354                    target: LOG_TARGET,
355                    ?peer,
356                    ?err,
357                    "Decode failed"
358                );
359                return;
360            }
361        };
362
363        let ret = match req {
364            ConsensusRequest::BlockDownloadV0(req) => {
365                self.on_initial_request(req).map(|rsp| rsp.encode())
366            }
367            ConsensusRequest::ProtocolMessageV0(msg) => self.on_protocol_message(msg),
368            ConsensusRequest::FullBlockDownloadV0(req) => {
369                self.on_full_download_request(req).map(|rsp| rsp.encode())
370            }
371        };
372
373        match ret {
374            Ok(response) => {
375                self.metrics.on_request();
376                self.send_response(peer, response, pending_response);
377                trace!(
378                    target: LOG_TARGET,
379                    ?peer,
380                    "server: request processed from"
381                );
382            }
383            Err(error) => {
384                self.metrics.on_failed_request(&error);
385                debug!(
386                    target: LOG_TARGET,
387                    ?peer,
388                    ?error,
389                    "Server error"
390                );
391            }
392        }
393    }
394
395    /// Handles the initial request from the client
396    fn on_initial_request(
397        &self,
398        initial_request: InitialRequest<Block>,
399    ) -> Result<InitialResponse<Block, TxHash<Pool>>, RelayError> {
400        let block_hash = self.block_hash(&initial_request.from_block)?;
401        let block_attributes = initial_request.block_attributes;
402
403        // Build the generic and the protocol specific parts of the response
404        let partial_block = self.get_partial_block(&block_hash, block_attributes)?;
405        let ProtocolInitialRequest::CompactBlock(compact_request) =
406            initial_request.protocol_request;
407        let protocol_response = if block_attributes.contains(BlockAttributes::BODY) {
408            let compact_response = self.compact_block.build_initial_response(
409                &block_hash,
410                compact_request,
411                self.backend.as_ref(),
412            )?;
413            Some(ProtocolInitialResponse::from(compact_response))
414        } else {
415            None
416        };
417
418        Ok(InitialResponse {
419            block_hash,
420            partial_block,
421            protocol_response,
422        })
423    }
424
425    /// Handles the protocol message from the client
426    fn on_protocol_message(
427        &self,
428        msg: ProtocolMessage<Block, TxHash<Pool>>,
429    ) -> Result<Vec<u8>, RelayError> {
430        let response = match msg {
431            ProtocolMessage::CompactBlock(msg) => self
432                .compact_block
433                .on_protocol_message(msg, self.backend.as_ref())?
434                .encode(),
435        };
436        Ok(response)
437    }
438
439    /// Handles the full download request from the client
440    fn on_full_download_request(
441        &self,
442        full_download_request: FullDownloadRequest<Block>,
443    ) -> Result<FullDownloadResponse<Block>, RelayError> {
444        let block_request = full_download_request.0;
445        let mut blocks = Vec::new();
446        let mut block_id = match block_request.from {
447            FromBlock::Hash(h) => BlockId::<Block>::Hash(h),
448            FromBlock::Number(n) => BlockId::<Block>::Number(n),
449        };
450
451        // This is a compact length encoding of max number of blocks to be sent in response
452        let mut total_size = Compact::compact_len(&MAX_RESPONSE_BLOCKS.get());
453        let max_blocks = block_request.max.map_or(MAX_RESPONSE_BLOCKS.into(), |val| {
454            std::cmp::min(val, MAX_RESPONSE_BLOCKS.into())
455        });
456        while blocks.len() < max_blocks as usize {
457            let block_hash = self.block_hash(&block_id)?;
458            let partial_block = self.get_partial_block(&block_hash, block_request.fields)?;
459            let body = if block_request.fields.contains(BlockAttributes::BODY) {
460                Some(block_transactions(&block_hash, self.client.as_ref())?)
461            } else {
462                None
463            };
464            let block_number = partial_block.block_number;
465            let parent_hash = partial_block.parent_hash;
466            let block_data = partial_block.block_data(body);
467
468            // Enforce the max limit on response size.
469            let bytes = block_data.encoded_size();
470            if !blocks.is_empty() && (total_size + bytes) > MAX_RESPONSE_SIZE.into() {
471                break;
472            }
473            total_size += bytes;
474            blocks.push(block_data);
475
476            block_id = match block_request.direction {
477                Direction::Ascending => BlockId::Number(block_number + One::one()),
478                Direction::Descending => {
479                    if block_number.is_zero() {
480                        break;
481                    }
482                    BlockId::Hash(parent_hash)
483                }
484            };
485        }
486
487        Ok(FullDownloadResponse(blocks))
488    }
489
490    /// Builds the partial block response
491    fn get_partial_block(
492        &self,
493        block_hash: &BlockHashFor<Block>,
494        block_attributes: BlockAttributes,
495    ) -> Result<PartialBlock<Block>, RelayError> {
496        let block_header = match self.client.header(*block_hash) {
497            Ok(Some(header)) => header,
498            Ok(None) => {
499                return Err(RelayError::BlockHeader(format!(
500                    "Missing header: {block_hash:?}"
501                )))
502            }
503            Err(err) => return Err(RelayError::BlockHeader(format!("{block_hash:?}, {err:?}"))),
504        };
505        let parent_hash = *block_header.parent_hash();
506        let block_number = *block_header.number();
507        let block_header_hash = block_header.hash();
508
509        let header = if block_attributes.contains(BlockAttributes::HEADER) {
510            Some(block_header)
511        } else {
512            None
513        };
514
515        let indexed_body = if block_attributes.contains(BlockAttributes::INDEXED_BODY) {
516            self.client
517                .block_indexed_body(*block_hash)
518                .map_err(|err| RelayError::BlockIndexedBody(format!("{block_hash:?}, {err:?}")))?
519        } else {
520            None
521        };
522
523        let justifications = if block_attributes.contains(BlockAttributes::JUSTIFICATION) {
524            self.client.justifications(*block_hash).map_err(|err| {
525                RelayError::BlockJustifications(format!("{block_hash:?}, {err:?}"))
526            })?
527        } else {
528            None
529        };
530
531        Ok(PartialBlock {
532            parent_hash,
533            block_number,
534            block_header_hash,
535            header,
536            indexed_body,
537            justifications,
538        })
539    }
540
541    /// Converts the BlockId to block hash
542    fn block_hash(&self, block_id: &BlockId<Block>) -> Result<BlockHashFor<Block>, RelayError> {
543        match self.client.block_hash_from_id(block_id) {
544            Ok(Some(hash)) => Ok(hash),
545            Ok(None) => Err(RelayError::BlockHash(format!("Missing: {block_id:?}"))),
546            Err(err) => Err(RelayError::BlockHash(format!("{block_id:?}, {err:?}"))),
547        }
548    }
549
550    /// Builds/sends the response back to the client
551    fn send_response(
552        &self,
553        peer: PeerId,
554        response: Vec<u8>,
555        sender: oneshot::Sender<OutgoingResponse>,
556    ) {
557        let response = OutgoingResponse {
558            result: Ok(response),
559            reputation_changes: Vec::new(),
560            sent_feedback: None,
561        };
562        if sender.send(response).is_err() {
563            warn!(
564                target: LOG_TARGET,
565                ?peer,
566                "Failed to send response"
567            );
568        }
569    }
570}
571
572#[async_trait]
573impl<Block, Client, Pool> BlockServer<Block> for ConsensusRelayServer<Block, Client, Pool>
574where
575    Block: BlockT,
576    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
577    Client::Api: SubspaceApi<Block, PublicKey>,
578    Pool: TransactionPool<Block = Block> + 'static,
579{
580    async fn run(&mut self) {
581        info!(
582            target: LOG_TARGET,
583            "relay::consensus block server: starting"
584        );
585        while let Some(request) = self.request_receiver.next().await {
586            self.process_incoming_request(request).await;
587        }
588    }
589}
590
591/// The client backend.
592struct ConsensusClientBackend<Pool> {
593    transaction_pool: Arc<Pool>,
594}
595
596impl<Block, Pool> ClientBackend<TxHash<Pool>, ExtrinsicFor<Block>> for ConsensusClientBackend<Pool>
597where
598    Block: BlockT,
599    Pool: TransactionPool<Block = Block> + 'static,
600{
601    fn protocol_unit(&self, tx_hash: &TxHash<Pool>) -> Option<ExtrinsicFor<Block>> {
602        // Look up the transaction pool.
603        self.transaction_pool
604            .ready_transaction(tx_hash)
605            .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
606    }
607}
608
609/// The server backend.
610struct ConsensusServerBackend<Client, Pool> {
611    client: Arc<Client>,
612    transaction_pool: Arc<Pool>,
613}
614
615impl<Block, Client, Pool> ServerBackend<BlockHashFor<Block>, TxHash<Pool>, ExtrinsicFor<Block>>
616    for ConsensusServerBackend<Client, Pool>
617where
618    Block: BlockT,
619    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block>,
620    Client::Api: SubspaceApi<Block, PublicKey>,
621    Pool: TransactionPool<Block = Block> + 'static,
622{
623    fn download_unit_members(
624        &self,
625        block_hash: &BlockHashFor<Block>,
626    ) -> Result<Vec<ProtocolUnitInfo<TxHash<Pool>, ExtrinsicFor<Block>>>, RelayError> {
627        let txns = block_transactions(block_hash, self.client.as_ref())?;
628        Ok(txns
629            .into_iter()
630            .map(|extrinsic| {
631                let send_tx = extrinsic.encoded_size() <= TX_SIZE_THRESHOLD.get()
632                    || self
633                        .client
634                        .runtime_api()
635                        .is_inherent(*block_hash, &extrinsic)
636                        .unwrap_or(false);
637                ProtocolUnitInfo {
638                    id: self.transaction_pool.hash_of(&extrinsic),
639                    unit: if send_tx { Some(extrinsic) } else { None },
640                }
641            })
642            .collect())
643    }
644
645    fn protocol_unit(
646        &self,
647        block_hash: &BlockHashFor<Block>,
648        tx_hash: &TxHash<Pool>,
649    ) -> Option<ExtrinsicFor<Block>> {
650        // Look up the block extrinsics.
651        match block_transactions(block_hash, self.client.as_ref()) {
652            Ok(extrinsics) => {
653                for extrinsic in extrinsics {
654                    if self.transaction_pool.hash_of(&extrinsic) == *tx_hash {
655                        return Some(extrinsic);
656                    }
657                }
658            }
659            Err(err) => {
660                debug!(
661                    target: LOG_TARGET,
662                    ?block_hash,
663                    ?tx_hash,
664                    ?err,
665                    "consensus server protocol_unit: "
666                );
667            }
668        }
669
670        // Next look up the transaction pool.
671        self.transaction_pool
672            .ready_transaction(tx_hash)
673            .map(|in_pool_tx| in_pool_tx.data().as_ref().clone())
674    }
675}
676
677/// Retrieves the block transactions/tx hash from the backend.
678fn block_transactions<Block, Client>(
679    block_hash: &BlockHashFor<Block>,
680    client: &Client,
681) -> Result<Vec<ExtrinsicFor<Block>>, RelayError>
682where
683    Block: BlockT,
684    Client: HeaderBackend<Block> + BlockBackend<Block>,
685{
686    let maybe_extrinsics = client
687        .block_body(*block_hash)
688        .map_err(|err| RelayError::BlockBody(format!("{block_hash:?}, {err:?}")))?;
689    maybe_extrinsics.ok_or(RelayError::BlockExtrinsicsNotFound(format!(
690        "{block_hash:?}"
691    )))
692}
693
694#[derive(Debug, thiserror::Error)]
695pub enum BlockRelayConfigurationError {
696    #[error(transparent)]
697    PrometheusError(#[from] PrometheusError),
698}
699
700/// Sets up the relay components.
701pub fn build_consensus_relay<Block, Client, Pool>(
702    network: Arc<NetworkWrapper>,
703    client: Arc<Client>,
704    pool: Arc<Pool>,
705    registry: Option<&Registry>,
706) -> Result<
707    BlockRelayParams<Block, NetworkWorker<Block, BlockHashFor<Block>>>,
708    BlockRelayConfigurationError,
709>
710where
711    Block: BlockT,
712    Client: HeaderBackend<Block> + BlockBackend<Block> + ProvideRuntimeApi<Block> + 'static,
713    Client::Api: SubspaceApi<Block, PublicKey>,
714    Pool: TransactionPool<Block = Block> + 'static,
715{
716    let (tx, request_receiver) = async_channel::bounded(NUM_PEER_HINT.get());
717
718    let backend = Arc::new(ConsensusClientBackend {
719        transaction_pool: pool.clone(),
720    });
721    let metrics = ConsensusClientMetrics::new(registry)
722        .map_err(BlockRelayConfigurationError::PrometheusError)?;
723    let relay_client: ConsensusRelayClient<Block, Pool> = ConsensusRelayClient::new(
724        network,
725        SYNC_PROTOCOL.into(),
726        CompactBlockClient::new(),
727        backend,
728        metrics,
729    );
730
731    let backend = Arc::new(ConsensusServerBackend {
732        client: client.clone(),
733        transaction_pool: pool.clone(),
734    });
735    let metrics = ConsensusServerMetrics::new(registry)
736        .map_err(BlockRelayConfigurationError::PrometheusError)?;
737    let relay_server = ConsensusRelayServer::new(
738        client,
739        CompactBlockServer::new(),
740        request_receiver,
741        backend,
742        metrics,
743    );
744
745    let mut protocol_config = ProtocolConfig {
746        name: SYNC_PROTOCOL.into(),
747        fallback_names: Vec::new(),
748        max_request_size: 1024 * 1024,
749        max_response_size: 16 * 1024 * 1024,
750        request_timeout: Duration::from_secs(20),
751        inbound_queue: None,
752    };
753    protocol_config.inbound_queue = Some(tx);
754
755    Ok(BlockRelayParams {
756        server: Box::new(relay_server),
757        downloader: Arc::new(relay_client),
758        request_response_config: protocol_config,
759    })
760}