sc_consensus_subspace_rpc/
lib.rs

1//! RPC api for Subspace.
2
3#![feature(try_blocks)]
4
5use futures::channel::mpsc;
6use futures::{future, stream, FutureExt, StreamExt};
7use jsonrpsee::core::async_trait;
8use jsonrpsee::proc_macros::rpc;
9use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
10use jsonrpsee::{Extensions, PendingSubscriptionSink};
11use parking_lot::Mutex;
12use sc_client_api::{AuxStore, BlockBackend};
13use sc_consensus_subspace::archiver::{
14    recreate_genesis_segment, ArchivedSegmentNotification, ObjectMappingNotification,
15    SegmentHeadersStore,
16};
17use sc_consensus_subspace::notification::SubspaceNotificationStream;
18use sc_consensus_subspace::slot_worker::{
19    NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle,
20};
21use sc_rpc::utils::{BoundedVecDeque, PendingSubscription};
22use sc_rpc::SubscriptionTaskExecutor;
23use sc_rpc_api::{check_if_safe, UnsafeRpcError};
24use sc_utils::mpsc::TracingUnboundedSender;
25use schnellru::{ByLength, LruMap};
26use sp_api::{ApiError, ProvideRuntimeApi};
27use sp_blockchain::HeaderBackend;
28use sp_consensus::SyncOracle;
29use sp_consensus_subspace::{ChainConstants, SubspaceApi};
30use sp_core::H256;
31use sp_objects::ObjectsApi;
32use sp_runtime::traits::Block as BlockT;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::marker::PhantomData;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::sync::{Arc, Weak};
38use std::time::Duration;
39use subspace_archiving::archiver::NewArchivedSegment;
40use subspace_core_primitives::hashes::Blake3Hash;
41use subspace_core_primitives::objects::GlobalObjectMapping;
42use subspace_core_primitives::pieces::{Piece, PieceIndex};
43use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
44use subspace_core_primitives::solutions::Solution;
45use subspace_core_primitives::{BlockHash, PublicKey, SlotNumber};
46use subspace_erasure_coding::ErasureCoding;
47use subspace_farmer_components::FarmerProtocolInfo;
48use subspace_kzg::Kzg;
49use subspace_networking::libp2p::Multiaddr;
50use subspace_rpc_primitives::{
51    FarmerAppInfo, ObjectMappingResponse, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
52    SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
53};
54use tracing::{debug, error, warn};
55
56const SUBSPACE_ERROR: i32 = 9000;
57/// This is essentially equal to expected number of votes per block, one more is added implicitly by
58/// the fact that channel sender exists
59const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9;
60const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
61
62/// The number of object mappings to include in each subscription response message.
63///
64/// This is a tradeoff between `RPC_DEFAULT_MESSAGE_CAPACITY_PER_CONN` and
65/// `RPC_DEFAULT_MAX_RESPONSE_SIZE_MB`. We estimate 500K mappings per segment,
66///  and the minimum hex-encoded mapping size is 88 bytes.
67// TODO: make this into a CLI option, or calculate this from other CLI options
68const OBJECT_MAPPING_BATCH_SIZE: usize = 1000;
69
70/// The maximum number of object hashes allowed in a subscription filter.
71///
72/// Each hash takes up 64 bytes in JSON, and 32 bytes in memory.
73// TODO: make this into a CLI option, or calculate this from other CLI options
74const MAX_OBJECT_HASHES_PER_SUBSCRIPTION: usize = 1000;
75
76// TODO: More specific errors instead of `StringError`
77/// Top-level error type for the RPC handler.
78#[derive(Debug, thiserror::Error)]
79pub enum Error {
80    /// Errors that can be formatted as a String
81    #[error("{0}")]
82    StringError(String),
83    /// Call to an unsafe RPC was denied.
84    #[error(transparent)]
85    UnsafeRpcCalled(#[from] UnsafeRpcError),
86}
87
88impl From<Error> for ErrorObjectOwned {
89    fn from(error: Error) -> Self {
90        match error {
91            Error::StringError(e) => ErrorObject::owned(SUBSPACE_ERROR + 1, e, None::<()>),
92            Error::UnsafeRpcCalled(e) => e.into(),
93        }
94    }
95}
96
97/// Provides rpc methods for interacting with Subspace.
98#[rpc(client, server)]
99pub trait SubspaceRpcApi {
100    /// Get metadata necessary for farmer operation
101    #[method(name = "subspace_getFarmerAppInfo")]
102    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
103
104    #[method(name = "subspace_submitSolutionResponse", with_extensions)]
105    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
106
107    /// Slot info subscription
108    #[subscription(
109        name = "subspace_subscribeSlotInfo" => "subspace_slot_info",
110        unsubscribe = "subspace_unsubscribeSlotInfo",
111        item = SlotInfo,
112        with_extensions,
113    )]
114    fn subscribe_slot_info(&self);
115
116    /// Sign block subscription
117    #[subscription(
118        name = "subspace_subscribeRewardSigning" => "subspace_reward_signing",
119        unsubscribe = "subspace_unsubscribeRewardSigning",
120        item = RewardSigningInfo,
121        with_extensions,
122    )]
123    fn subscribe_reward_signing(&self);
124
125    #[method(name = "subspace_submitRewardSignature", with_extensions)]
126    fn submit_reward_signature(
127        &self,
128        reward_signature: RewardSignatureResponse,
129    ) -> Result<(), Error>;
130
131    /// Archived segment header subscription
132    #[subscription(
133        name = "subspace_subscribeArchivedSegmentHeader" => "subspace_archived_segment_header",
134        unsubscribe = "subspace_unsubscribeArchivedSegmentHeader",
135        item = SegmentHeader,
136        with_extensions,
137    )]
138    fn subscribe_archived_segment_header(&self);
139
140    #[method(name = "subspace_segmentHeaders")]
141    async fn segment_headers(
142        &self,
143        segment_indexes: Vec<SegmentIndex>,
144    ) -> Result<Vec<Option<SegmentHeader>>, Error>;
145
146    #[method(name = "subspace_piece", blocking, with_extensions)]
147    fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
148
149    #[method(name = "subspace_acknowledgeArchivedSegmentHeader", with_extensions)]
150    async fn acknowledge_archived_segment_header(
151        &self,
152        segment_index: SegmentIndex,
153    ) -> Result<(), Error>;
154
155    #[method(name = "subspace_lastSegmentHeaders")]
156    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
157
158    /// DSN object mappings subscription
159    #[subscription(
160        name = "subspace_subscribeObjectMappings" => "subspace_object_mappings",
161        unsubscribe = "subspace_unsubscribeObjectMappings",
162        item = ObjectMappingResponse,
163        with_extensions,
164    )]
165    fn subscribe_object_mappings(&self);
166
167    /// Filtered DSN object mappings subscription
168    #[subscription(
169        name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings",
170        unsubscribe = "subspace_unsubscribeFilteredObjectMappings",
171        item = ObjectMappingResponse,
172        with_extensions,
173    )]
174    fn subscribe_filtered_object_mappings(&self, hashes: Vec<Blake3Hash>);
175}
176
177#[derive(Default)]
178struct ArchivedSegmentHeaderAcknowledgementSenders {
179    segment_index: SegmentIndex,
180    senders: HashMap<u64, TracingUnboundedSender<()>>,
181}
182
183#[derive(Default)]
184struct BlockSignatureSenders {
185    current_hash: H256,
186    senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
187}
188
189/// In-memory cache of last archived segment, such that when request comes back right after
190/// archived segment notification, RPC server is able to answer quickly.
191///
192/// We store weak reference, such that archived segment is not persisted for longer than
193/// necessary occupying RAM.
194enum CachedArchivedSegment {
195    /// Special case for genesis segment when requested over RPC
196    Genesis(Arc<NewArchivedSegment>),
197    Weak(Weak<NewArchivedSegment>),
198}
199
200impl CachedArchivedSegment {
201    fn get(&self) -> Option<Arc<NewArchivedSegment>> {
202        match self {
203            CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
204            CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
205        }
206    }
207}
208
209/// Subspace RPC configuration
210pub struct SubspaceRpcConfig<Client, SO, AS>
211where
212    SO: SyncOracle + Send + Sync + Clone + 'static,
213    AS: AuxStore + Send + Sync + 'static,
214{
215    /// Substrate client
216    pub client: Arc<Client>,
217    /// Task executor that is being used by RPC subscriptions
218    pub subscription_executor: SubscriptionTaskExecutor,
219    /// New slot notification stream
220    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
221    /// Reward signing notification stream
222    pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
223    /// Archived mapping notification stream
224    pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
225    /// Archived segment notification stream
226    pub archived_segment_notification_stream:
227        SubspaceNotificationStream<ArchivedSegmentNotification>,
228    /// DSN bootstrap nodes
229    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
230    /// Segment headers store
231    pub segment_headers_store: SegmentHeadersStore<AS>,
232    /// Subspace sync oracle
233    pub sync_oracle: SubspaceSyncOracle<SO>,
234    /// Kzg instance
235    pub kzg: Kzg,
236    /// Erasure coding instance
237    pub erasure_coding: ErasureCoding,
238}
239
240/// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace.
241pub struct SubspaceRpc<Block, Client, SO, AS>
242where
243    Block: BlockT,
244    SO: SyncOracle + Send + Sync + Clone + 'static,
245{
246    client: Arc<Client>,
247    subscription_executor: SubscriptionTaskExecutor,
248    new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
249    reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
250    object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
251    archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
252    #[allow(clippy::type_complexity)]
253    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution<PublicKey>>>>>,
254    reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
255    dsn_bootstrap_nodes: Vec<Multiaddr>,
256    segment_headers_store: SegmentHeadersStore<AS>,
257    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
258    archived_segment_acknowledgement_senders:
259        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
260    next_subscription_id: AtomicU64,
261    sync_oracle: SubspaceSyncOracle<SO>,
262    genesis_hash: BlockHash,
263    chain_constants: ChainConstants,
264    max_pieces_in_sector: u16,
265    kzg: Kzg,
266    erasure_coding: ErasureCoding,
267    _block: PhantomData<Block>,
268}
269
270/// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for
271/// submission of solutions (or lack thereof).
272///
273/// Internally every time slot notifier emits information about new slot, notification is sent to
274/// every subscriber, after which RPC server waits for the same number of
275/// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until
276/// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored.
277impl<Block, Client, SO, AS> SubspaceRpc<Block, Client, SO, AS>
278where
279    Block: BlockT,
280    Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
281    Client::Api: SubspaceApi<Block, PublicKey>,
282    SO: SyncOracle + Send + Sync + Clone + 'static,
283    AS: AuxStore + Send + Sync + 'static,
284{
285    /// Creates a new instance of the `SubspaceRpc` handler.
286    pub fn new(config: SubspaceRpcConfig<Client, SO, AS>) -> Result<Self, ApiError> {
287        let info = config.client.info();
288        let best_hash = info.best_hash;
289        let genesis_hash = BlockHash::try_from(info.genesis_hash.as_ref())
290            .expect("Genesis hash must always be convertible into BlockHash; qed");
291        let runtime_api = config.client.runtime_api();
292        let chain_constants = runtime_api.chain_constants(best_hash)?;
293        // While the number can technically change in runtime, farmer will not adjust to it on the
294        // fly and previous value will remain valid (number only expected to increase), so it is
295        // fine to query it only once
296        let max_pieces_in_sector = runtime_api.max_pieces_in_sector(best_hash)?;
297        let block_authoring_delay = u64::from(chain_constants.block_authoring_delay());
298        let block_authoring_delay = usize::try_from(block_authoring_delay)
299            .expect("Block authoring delay will never exceed usize on any platform; qed");
300        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
301            .expect("Always a tiny constant in the protocol; qed");
302
303        Ok(Self {
304            client: config.client,
305            subscription_executor: config.subscription_executor,
306            new_slot_notification_stream: config.new_slot_notification_stream,
307            reward_signing_notification_stream: config.reward_signing_notification_stream,
308            object_mapping_notification_stream: config.object_mapping_notification_stream,
309            archived_segment_notification_stream: config.archived_segment_notification_stream,
310            solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new(
311                solution_response_senders_capacity,
312            )))),
313            reward_signature_senders: Arc::default(),
314            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
315            segment_headers_store: config.segment_headers_store,
316            cached_archived_segment: Arc::default(),
317            archived_segment_acknowledgement_senders: Arc::default(),
318            next_subscription_id: AtomicU64::default(),
319            sync_oracle: config.sync_oracle,
320            genesis_hash,
321            chain_constants,
322            max_pieces_in_sector,
323            kzg: config.kzg,
324            erasure_coding: config.erasure_coding,
325            _block: PhantomData,
326        })
327    }
328}
329
330#[async_trait]
331impl<Block, Client, SO, AS> SubspaceRpcApiServer for SubspaceRpc<Block, Client, SO, AS>
332where
333    Block: BlockT,
334    Client: ProvideRuntimeApi<Block>
335        + HeaderBackend<Block>
336        + BlockBackend<Block>
337        + Send
338        + Sync
339        + 'static,
340    Client::Api: ObjectsApi<Block>,
341    SO: SyncOracle + Send + Sync + Clone + 'static,
342    AS: AuxStore + Send + Sync + 'static,
343{
344    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
345        let last_segment_index = self
346            .segment_headers_store
347            .max_segment_index()
348            .unwrap_or(SegmentIndex::ZERO);
349
350        let farmer_app_info: Result<FarmerAppInfo, ApiError> = try {
351            let chain_constants = &self.chain_constants;
352            let protocol_info = FarmerProtocolInfo {
353                history_size: HistorySize::from(last_segment_index),
354                max_pieces_in_sector: self.max_pieces_in_sector,
355                recent_segments: chain_constants.recent_segments(),
356                recent_history_fraction: chain_constants.recent_history_fraction(),
357                min_sector_lifetime: chain_constants.min_sector_lifetime(),
358            };
359
360            FarmerAppInfo {
361                genesis_hash: self.genesis_hash,
362                dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
363                syncing: self.sync_oracle.is_major_syncing(),
364                farming_timeout: chain_constants
365                    .slot_duration()
366                    .as_duration()
367                    .mul_f64(SlotNumber::from(chain_constants.block_authoring_delay()) as f64),
368                protocol_info,
369            }
370        };
371
372        farmer_app_info.map_err(|error| {
373            error!("Failed to get data from runtime API: {}", error);
374            Error::StringError("Internal error".to_string())
375        })
376    }
377
378    fn submit_solution_response(
379        &self,
380        ext: &Extensions,
381        solution_response: SolutionResponse,
382    ) -> Result<(), Error> {
383        check_if_safe(ext)?;
384
385        let slot = solution_response.slot_number;
386        let mut solution_response_senders = self.solution_response_senders.lock();
387
388        let success = solution_response_senders
389            .peek_mut(&slot)
390            .and_then(|sender| sender.try_send(solution_response.solution).ok())
391            .is_some();
392
393        if !success {
394            warn!(
395                %slot,
396                "Solution was ignored, likely because farmer was too slow"
397            );
398
399            return Err(Error::StringError("Solution was ignored".to_string()));
400        }
401
402        Ok(())
403    }
404
405    fn subscribe_slot_info(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
406        let executor = self.subscription_executor.clone();
407        let solution_response_senders = self.solution_response_senders.clone();
408        let allow_solutions = check_if_safe(ext).is_ok();
409
410        let handle_slot_notification = move |new_slot_notification| {
411            let NewSlotNotification {
412                new_slot_info,
413                mut solution_sender,
414            } = new_slot_notification;
415
416            let slot_number = SlotNumber::from(new_slot_info.slot);
417
418            // Only handle solution responses in case unsafe APIs are allowed
419            if allow_solutions {
420                // Store solution sender so that we can retrieve it when solution comes from
421                // the farmer
422                let mut solution_response_senders = solution_response_senders.lock();
423                if solution_response_senders.peek(&slot_number).is_none() {
424                    let (response_sender, mut response_receiver) =
425                        mpsc::channel(SOLUTION_SENDER_CHANNEL_CAPACITY);
426
427                    solution_response_senders.insert(slot_number, response_sender);
428
429                    // Wait for solutions and transform proposed proof of space solutions
430                    // into data structure `sc-consensus-subspace` expects
431                    let forward_solution_fut = async move {
432                        while let Some(solution) = response_receiver.next().await {
433                            let public_key = solution.public_key;
434                            let sector_index = solution.sector_index;
435
436                            let solution = Solution {
437                                public_key,
438                                reward_address: solution.reward_address,
439                                sector_index,
440                                history_size: solution.history_size,
441                                piece_offset: solution.piece_offset,
442                                record_commitment: solution.record_commitment,
443                                record_witness: solution.record_witness,
444                                chunk: solution.chunk,
445                                chunk_witness: solution.chunk_witness,
446                                proof_of_space: solution.proof_of_space,
447                            };
448
449                            if solution_sender.try_send(solution).is_err() {
450                                warn!(
451                                    slot = %slot_number,
452                                    %sector_index,
453                                    %public_key,
454                                    "Solution receiver is closed, likely because farmer was too slow"
455                                );
456                            }
457                        }
458                    };
459
460                    executor.spawn(
461                        "subspace-slot-info-forward",
462                        Some("rpc"),
463                        Box::pin(forward_solution_fut),
464                    );
465                }
466            }
467
468            let global_challenge = new_slot_info
469                .proof_of_time
470                .derive_global_randomness()
471                .derive_global_challenge(slot_number);
472
473            // This will be sent to the farmer
474            SlotInfo {
475                slot_number,
476                global_challenge,
477                solution_range: new_slot_info.solution_range,
478                voting_solution_range: new_slot_info.voting_solution_range,
479            }
480        };
481        let stream = self
482            .new_slot_notification_stream
483            .subscribe()
484            .map(handle_slot_notification);
485
486        self.subscription_executor.spawn(
487            "subspace-slot-info-subscription",
488            Some("rpc"),
489            PendingSubscription::from(pending)
490                .pipe_from_stream(stream, BoundedVecDeque::default())
491                .boxed(),
492        );
493    }
494
495    fn subscribe_reward_signing(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
496        if check_if_safe(ext).is_err() {
497            debug!("Unsafe subscribe_reward_signing ignored");
498            return;
499        }
500
501        let executor = self.subscription_executor.clone();
502        let reward_signature_senders = self.reward_signature_senders.clone();
503
504        let stream = self.reward_signing_notification_stream.subscribe().map(
505            move |reward_signing_notification| {
506                let RewardSigningNotification {
507                    hash,
508                    public_key,
509                    signature_sender,
510                } = reward_signing_notification;
511
512                let (response_sender, response_receiver) = async_oneshot::oneshot();
513
514                // Store signature sender so that we can retrieve it when solution comes from
515                // the farmer
516                {
517                    let mut reward_signature_senders = reward_signature_senders.lock();
518
519                    if reward_signature_senders.current_hash != hash {
520                        reward_signature_senders.current_hash = hash;
521                        reward_signature_senders.senders.clear();
522                    }
523
524                    reward_signature_senders.senders.push(response_sender);
525                }
526
527                // Wait for solutions and transform proposed proof of space solutions into
528                // data structure `sc-consensus-subspace` expects
529                let forward_signature_fut = async move {
530                    if let Ok(reward_signature) = response_receiver.await {
531                        if let Some(signature) = reward_signature.signature {
532                            let _ = signature_sender.unbounded_send(signature);
533                        }
534                    }
535                };
536
537                // Run above future with timeout
538                executor.spawn(
539                    "subspace-block-signing-forward",
540                    Some("rpc"),
541                    future::select(
542                        futures_timer::Delay::new(REWARD_SIGNING_TIMEOUT),
543                        Box::pin(forward_signature_fut),
544                    )
545                    .map(|_| ())
546                    .boxed(),
547                );
548
549                // This will be sent to the farmer
550                RewardSigningInfo {
551                    hash: hash.into(),
552                    public_key,
553                }
554            },
555        );
556
557        self.subscription_executor.spawn(
558            "subspace-block-signing-subscription",
559            Some("rpc"),
560            PendingSubscription::from(pending)
561                .pipe_from_stream(stream, BoundedVecDeque::default())
562                .boxed(),
563        );
564    }
565
566    fn submit_reward_signature(
567        &self,
568        ext: &Extensions,
569        reward_signature: RewardSignatureResponse,
570    ) -> Result<(), Error> {
571        check_if_safe(ext)?;
572
573        let reward_signature_senders = self.reward_signature_senders.clone();
574
575        // TODO: This doesn't track what client sent a solution, allowing some clients to send
576        //  multiple (https://github.com/paritytech/jsonrpsee/issues/452)
577        let mut reward_signature_senders = reward_signature_senders.lock();
578
579        if reward_signature_senders.current_hash == reward_signature.hash.into() {
580            if let Some(mut sender) = reward_signature_senders.senders.pop() {
581                let _ = sender.send(reward_signature);
582            }
583        }
584
585        Ok(())
586    }
587
588    fn subscribe_archived_segment_header(
589        &self,
590        pending: PendingSubscriptionSink,
591        ext: &Extensions,
592    ) {
593        let archived_segment_acknowledgement_senders =
594            self.archived_segment_acknowledgement_senders.clone();
595
596        let cached_archived_segment = Arc::clone(&self.cached_archived_segment);
597        let subscription_id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
598        let allow_acknowledgements = check_if_safe(ext).is_ok();
599
600        let stream = self
601            .archived_segment_notification_stream
602            .subscribe()
603            .filter_map(move |archived_segment_notification| {
604                let ArchivedSegmentNotification {
605                    archived_segment,
606                    acknowledgement_sender,
607                } = archived_segment_notification;
608
609                let segment_index = archived_segment.segment_header.segment_index();
610
611                // Store acknowledgment sender so that we can retrieve it when acknowledgement
612                // comes from the farmer, but only if unsafe APIs are allowed
613                let maybe_archived_segment_header = if allow_acknowledgements {
614                    let mut archived_segment_acknowledgement_senders =
615                        archived_segment_acknowledgement_senders.lock();
616
617                    if archived_segment_acknowledgement_senders.segment_index != segment_index {
618                        archived_segment_acknowledgement_senders.segment_index = segment_index;
619                        archived_segment_acknowledgement_senders.senders.clear();
620                    }
621
622                    let maybe_archived_segment_header =
623                        match archived_segment_acknowledgement_senders
624                            .senders
625                            .entry(subscription_id)
626                        {
627                            Entry::Occupied(_) => {
628                                // No need to do anything, farmer is processing request
629                                None
630                            }
631                            Entry::Vacant(entry) => {
632                                entry.insert(acknowledgement_sender);
633
634                                // This will be sent to the farmer
635                                Some(archived_segment.segment_header)
636                            }
637                        };
638
639                    cached_archived_segment
640                        .lock()
641                        .replace(CachedArchivedSegment::Weak(Arc::downgrade(
642                            &archived_segment,
643                        )));
644
645                    maybe_archived_segment_header
646                } else {
647                    // In case unsafe APIs are not allowed, just return segment header without
648                    // requiring it to be acknowledged
649                    Some(archived_segment.segment_header)
650                };
651
652                Box::pin(async move { maybe_archived_segment_header })
653            });
654
655        let archived_segment_acknowledgement_senders =
656            self.archived_segment_acknowledgement_senders.clone();
657        let fut = async move {
658            PendingSubscription::from(pending)
659                .pipe_from_stream(stream, BoundedVecDeque::default())
660                .await;
661
662            let mut archived_segment_acknowledgement_senders =
663                archived_segment_acknowledgement_senders.lock();
664
665            archived_segment_acknowledgement_senders
666                .senders
667                .remove(&subscription_id);
668        };
669
670        self.subscription_executor.spawn(
671            "subspace-archived-segment-header-subscription",
672            Some("rpc"),
673            fut.boxed(),
674        );
675    }
676
677    async fn acknowledge_archived_segment_header(
678        &self,
679        ext: &Extensions,
680        segment_index: SegmentIndex,
681    ) -> Result<(), Error> {
682        check_if_safe(ext)?;
683
684        let archived_segment_acknowledgement_senders =
685            self.archived_segment_acknowledgement_senders.clone();
686
687        let maybe_sender = {
688            let mut archived_segment_acknowledgement_senders_guard =
689                archived_segment_acknowledgement_senders.lock();
690
691            (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
692                .then(|| {
693                    let last_key = *archived_segment_acknowledgement_senders_guard
694                        .senders
695                        .keys()
696                        .next()?;
697
698                    archived_segment_acknowledgement_senders_guard
699                        .senders
700                        .remove(&last_key)
701                })
702                .flatten()
703        };
704
705        if let Some(sender) = maybe_sender {
706            if let Err(error) = sender.unbounded_send(()) {
707                if !error.is_closed() {
708                    warn!("Failed to acknowledge archived segment: {error}");
709                }
710            }
711        }
712
713        debug!(%segment_index, "Acknowledged archived segment.");
714
715        Ok(())
716    }
717
718    // Note: this RPC uses the cached archived segment, which is only updated by archived segments subscriptions
719    fn piece(
720        &self,
721        ext: &Extensions,
722        requested_piece_index: PieceIndex,
723    ) -> Result<Option<Piece>, Error> {
724        check_if_safe(ext)?;
725
726        let archived_segment = {
727            let mut cached_archived_segment = self.cached_archived_segment.lock();
728
729            match cached_archived_segment
730                .as_ref()
731                .and_then(CachedArchivedSegment::get)
732            {
733                Some(archived_segment) => archived_segment,
734                None => {
735                    if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
736                        return Ok(None);
737                    }
738
739                    debug!(%requested_piece_index, "Re-creating genesis segment on demand");
740
741                    // Try to re-create genesis segment on demand
742                    match recreate_genesis_segment(
743                        &*self.client,
744                        self.kzg.clone(),
745                        self.erasure_coding.clone(),
746                    ) {
747                        Ok(Some(archived_segment)) => {
748                            let archived_segment = Arc::new(archived_segment);
749                            cached_archived_segment.replace(CachedArchivedSegment::Genesis(
750                                Arc::clone(&archived_segment),
751                            ));
752                            archived_segment
753                        }
754                        Ok(None) => {
755                            return Ok(None);
756                        }
757                        Err(error) => {
758                            error!(%error, "Failed to re-create genesis segment");
759
760                            return Err(Error::StringError(
761                                "Failed to re-create genesis segment".to_string(),
762                            ));
763                        }
764                    }
765                }
766            }
767        };
768
769        if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
770        {
771            return Ok(archived_segment
772                .pieces
773                .pieces()
774                .nth(requested_piece_index.position() as usize));
775        }
776
777        Ok(None)
778    }
779
780    async fn segment_headers(
781        &self,
782        segment_indexes: Vec<SegmentIndex>,
783    ) -> Result<Vec<Option<SegmentHeader>>, Error> {
784        if segment_indexes.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
785            error!(
786                "segment_indexes length exceed the limit: {} ",
787                segment_indexes.len()
788            );
789
790            return Err(Error::StringError(format!(
791                "segment_indexes length exceed the limit {MAX_SEGMENT_HEADERS_PER_REQUEST}"
792            )));
793        };
794
795        Ok(segment_indexes
796            .into_iter()
797            .map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
798            .collect())
799    }
800
801    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
802        if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
803            error!(
804                "Request limit ({}) exceed the server limit: {} ",
805                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
806            );
807
808            return Err(Error::StringError(format!(
809                "Request limit ({}) exceed the server limit: {} ",
810                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
811            )));
812        };
813
814        let last_segment_index = self
815            .segment_headers_store
816            .max_segment_index()
817            .unwrap_or(SegmentIndex::ZERO);
818
819        let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
820            .rev()
821            .take(limit as usize)
822            .map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
823            .collect::<Vec<_>>();
824
825        last_segment_headers.reverse();
826
827        Ok(last_segment_headers)
828    }
829
830    fn subscribe_object_mappings(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
831        if check_if_safe(ext).is_err() {
832            debug!("Unsafe subscribe_object_mappings ignored");
833            return;
834        }
835
836        let mapping_stream = self
837            .object_mapping_notification_stream
838            .subscribe()
839            .flat_map(|object_mapping_notification| {
840                let objects = object_mapping_notification.object_mapping;
841                let block_number = object_mapping_notification.block_number;
842
843                stream::iter(objects)
844                    .ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
845                    .map(move |chunk| ObjectMappingResponse {
846                        block_number,
847                        objects: GlobalObjectMapping::from_objects(chunk.iter().cloned()),
848                    })
849            });
850
851        self.subscription_executor.spawn(
852            "subspace-archived-object-mappings-subscription",
853            Some("rpc"),
854            PendingSubscription::from(pending)
855                .pipe_from_stream(mapping_stream, BoundedVecDeque::default())
856                .boxed(),
857        );
858    }
859
860    fn subscribe_filtered_object_mappings(
861        &self,
862        pending: PendingSubscriptionSink,
863        ext: &Extensions,
864        hashes: Vec<Blake3Hash>,
865    ) {
866        if check_if_safe(ext).is_err() {
867            debug!("Unsafe subscribe_filtered_object_mappings ignored");
868            return;
869        }
870
871        if hashes.len() > MAX_OBJECT_HASHES_PER_SUBSCRIPTION {
872            error!(
873                "Request hash count ({}) exceed the server limit: {} ",
874                hashes.len(),
875                MAX_OBJECT_HASHES_PER_SUBSCRIPTION
876            );
877
878            let err_fut = pending.reject(Error::StringError(format!(
879                "Request hash count ({}) exceed the server limit: {} ",
880                hashes.len(),
881                MAX_OBJECT_HASHES_PER_SUBSCRIPTION
882            )));
883
884            self.subscription_executor.spawn(
885                "subspace-filtered-object-mappings-subscription",
886                Some("rpc"),
887                err_fut.boxed(),
888            );
889
890            return;
891        };
892
893        let mut hashes = HashSet::<Blake3Hash>::from_iter(hashes);
894        let hash_count = hashes.len();
895        let mut object_count = 0;
896
897        let mapping_stream = self
898            .object_mapping_notification_stream
899            .subscribe()
900            .flat_map(move |object_mapping_notification| {
901                let objects = object_mapping_notification.object_mapping;
902                let block_number = object_mapping_notification.block_number;
903
904                let filtered_objects = objects
905                    .into_iter()
906                    .filter(|object| hashes.remove(&object.hash))
907                    .collect::<Vec<_>>();
908
909                stream::iter(filtered_objects)
910                    // Typically batches will be larger than the hash limit, but we want to allow
911                    // CLI options to change that in future.
912                    .ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
913                    .map(move |chunk| ObjectMappingResponse {
914                        block_number,
915                        objects: GlobalObjectMapping::from_objects(chunk.iter().cloned()),
916                    })
917            })
918            // Stop when we've returned mappings for all the hashes. Since we only yield each hash
919            // once, we don't need to check if hashes is empty here.
920            .take_while(move |mappings| {
921                object_count += mappings.objects.objects().len();
922                future::ready(object_count <= hash_count)
923            });
924
925        self.subscription_executor.spawn(
926            "subspace-filtered-object-mappings-subscription",
927            Some("rpc"),
928            PendingSubscription::from(pending)
929                .pipe_from_stream(mapping_stream, BoundedVecDeque::default())
930                .boxed(),
931        );
932    }
933}