1#![feature(try_blocks)]
4
5use futures::channel::mpsc;
6use futures::{future, stream, FutureExt, StreamExt};
7use jsonrpsee::core::async_trait;
8use jsonrpsee::proc_macros::rpc;
9use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
10use jsonrpsee::{Extensions, PendingSubscriptionSink};
11use parking_lot::Mutex;
12use sc_client_api::{AuxStore, BlockBackend};
13use sc_consensus_subspace::archiver::{
14 recreate_genesis_segment, ArchivedSegmentNotification, ObjectMappingNotification,
15 SegmentHeadersStore,
16};
17use sc_consensus_subspace::notification::SubspaceNotificationStream;
18use sc_consensus_subspace::slot_worker::{
19 NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle,
20};
21use sc_rpc::utils::{BoundedVecDeque, PendingSubscription};
22use sc_rpc::SubscriptionTaskExecutor;
23use sc_rpc_api::{check_if_safe, UnsafeRpcError};
24use sc_utils::mpsc::TracingUnboundedSender;
25use schnellru::{ByLength, LruMap};
26use sp_api::{ApiError, ProvideRuntimeApi};
27use sp_blockchain::HeaderBackend;
28use sp_consensus::SyncOracle;
29use sp_consensus_subspace::{ChainConstants, SubspaceApi};
30use sp_core::H256;
31use sp_objects::ObjectsApi;
32use sp_runtime::traits::Block as BlockT;
33use std::collections::hash_map::Entry;
34use std::collections::{HashMap, HashSet};
35use std::marker::PhantomData;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::sync::{Arc, Weak};
38use std::time::Duration;
39use subspace_archiving::archiver::NewArchivedSegment;
40use subspace_core_primitives::hashes::Blake3Hash;
41use subspace_core_primitives::objects::GlobalObjectMapping;
42use subspace_core_primitives::pieces::{Piece, PieceIndex};
43use subspace_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
44use subspace_core_primitives::solutions::Solution;
45use subspace_core_primitives::{BlockHash, PublicKey, SlotNumber};
46use subspace_erasure_coding::ErasureCoding;
47use subspace_farmer_components::FarmerProtocolInfo;
48use subspace_kzg::Kzg;
49use subspace_networking::libp2p::Multiaddr;
50use subspace_rpc_primitives::{
51 FarmerAppInfo, ObjectMappingResponse, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
52 SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
53};
54use tracing::{debug, error, warn};
55
56const SUBSPACE_ERROR: i32 = 9000;
57const SOLUTION_SENDER_CHANNEL_CAPACITY: usize = 9;
60const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
61
62const OBJECT_MAPPING_BATCH_SIZE: usize = 1000;
69
70const MAX_OBJECT_HASHES_PER_SUBSCRIPTION: usize = 1000;
75
76#[derive(Debug, thiserror::Error)]
79pub enum Error {
80 #[error("{0}")]
82 StringError(String),
83 #[error(transparent)]
85 UnsafeRpcCalled(#[from] UnsafeRpcError),
86}
87
88impl From<Error> for ErrorObjectOwned {
89 fn from(error: Error) -> Self {
90 match error {
91 Error::StringError(e) => ErrorObject::owned(SUBSPACE_ERROR + 1, e, None::<()>),
92 Error::UnsafeRpcCalled(e) => e.into(),
93 }
94 }
95}
96
97#[rpc(client, server)]
99pub trait SubspaceRpcApi {
100 #[method(name = "subspace_getFarmerAppInfo")]
102 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
103
104 #[method(name = "subspace_submitSolutionResponse", with_extensions)]
105 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
106
107 #[subscription(
109 name = "subspace_subscribeSlotInfo" => "subspace_slot_info",
110 unsubscribe = "subspace_unsubscribeSlotInfo",
111 item = SlotInfo,
112 with_extensions,
113 )]
114 fn subscribe_slot_info(&self);
115
116 #[subscription(
118 name = "subspace_subscribeRewardSigning" => "subspace_reward_signing",
119 unsubscribe = "subspace_unsubscribeRewardSigning",
120 item = RewardSigningInfo,
121 with_extensions,
122 )]
123 fn subscribe_reward_signing(&self);
124
125 #[method(name = "subspace_submitRewardSignature", with_extensions)]
126 fn submit_reward_signature(
127 &self,
128 reward_signature: RewardSignatureResponse,
129 ) -> Result<(), Error>;
130
131 #[subscription(
133 name = "subspace_subscribeArchivedSegmentHeader" => "subspace_archived_segment_header",
134 unsubscribe = "subspace_unsubscribeArchivedSegmentHeader",
135 item = SegmentHeader,
136 with_extensions,
137 )]
138 fn subscribe_archived_segment_header(&self);
139
140 #[method(name = "subspace_segmentHeaders")]
141 async fn segment_headers(
142 &self,
143 segment_indexes: Vec<SegmentIndex>,
144 ) -> Result<Vec<Option<SegmentHeader>>, Error>;
145
146 #[method(name = "subspace_piece", blocking, with_extensions)]
147 fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
148
149 #[method(name = "subspace_acknowledgeArchivedSegmentHeader", with_extensions)]
150 async fn acknowledge_archived_segment_header(
151 &self,
152 segment_index: SegmentIndex,
153 ) -> Result<(), Error>;
154
155 #[method(name = "subspace_lastSegmentHeaders")]
156 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
157
158 #[subscription(
160 name = "subspace_subscribeObjectMappings" => "subspace_object_mappings",
161 unsubscribe = "subspace_unsubscribeObjectMappings",
162 item = ObjectMappingResponse,
163 with_extensions,
164 )]
165 fn subscribe_object_mappings(&self);
166
167 #[subscription(
169 name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings",
170 unsubscribe = "subspace_unsubscribeFilteredObjectMappings",
171 item = ObjectMappingResponse,
172 with_extensions,
173 )]
174 fn subscribe_filtered_object_mappings(&self, hashes: Vec<Blake3Hash>);
175}
176
177#[derive(Default)]
178struct ArchivedSegmentHeaderAcknowledgementSenders {
179 segment_index: SegmentIndex,
180 senders: HashMap<u64, TracingUnboundedSender<()>>,
181}
182
183#[derive(Default)]
184struct BlockSignatureSenders {
185 current_hash: H256,
186 senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
187}
188
189enum CachedArchivedSegment {
195 Genesis(Arc<NewArchivedSegment>),
197 Weak(Weak<NewArchivedSegment>),
198}
199
200impl CachedArchivedSegment {
201 fn get(&self) -> Option<Arc<NewArchivedSegment>> {
202 match self {
203 CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
204 CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
205 }
206 }
207}
208
209pub struct SubspaceRpcConfig<Client, SO, AS>
211where
212 SO: SyncOracle + Send + Sync + Clone + 'static,
213 AS: AuxStore + Send + Sync + 'static,
214{
215 pub client: Arc<Client>,
217 pub subscription_executor: SubscriptionTaskExecutor,
219 pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
221 pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
223 pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
225 pub archived_segment_notification_stream:
227 SubspaceNotificationStream<ArchivedSegmentNotification>,
228 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
230 pub segment_headers_store: SegmentHeadersStore<AS>,
232 pub sync_oracle: SubspaceSyncOracle<SO>,
234 pub kzg: Kzg,
236 pub erasure_coding: ErasureCoding,
238}
239
240pub struct SubspaceRpc<Block, Client, SO, AS>
242where
243 Block: BlockT,
244 SO: SyncOracle + Send + Sync + Clone + 'static,
245{
246 client: Arc<Client>,
247 subscription_executor: SubscriptionTaskExecutor,
248 new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
249 reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
250 object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
251 archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
252 #[allow(clippy::type_complexity)]
253 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution<PublicKey>>>>>,
254 reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
255 dsn_bootstrap_nodes: Vec<Multiaddr>,
256 segment_headers_store: SegmentHeadersStore<AS>,
257 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
258 archived_segment_acknowledgement_senders:
259 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
260 next_subscription_id: AtomicU64,
261 sync_oracle: SubspaceSyncOracle<SO>,
262 genesis_hash: BlockHash,
263 chain_constants: ChainConstants,
264 max_pieces_in_sector: u16,
265 kzg: Kzg,
266 erasure_coding: ErasureCoding,
267 _block: PhantomData<Block>,
268}
269
270impl<Block, Client, SO, AS> SubspaceRpc<Block, Client, SO, AS>
278where
279 Block: BlockT,
280 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
281 Client::Api: SubspaceApi<Block, PublicKey>,
282 SO: SyncOracle + Send + Sync + Clone + 'static,
283 AS: AuxStore + Send + Sync + 'static,
284{
285 pub fn new(config: SubspaceRpcConfig<Client, SO, AS>) -> Result<Self, ApiError> {
287 let info = config.client.info();
288 let best_hash = info.best_hash;
289 let genesis_hash = BlockHash::try_from(info.genesis_hash.as_ref())
290 .expect("Genesis hash must always be convertible into BlockHash; qed");
291 let runtime_api = config.client.runtime_api();
292 let chain_constants = runtime_api.chain_constants(best_hash)?;
293 let max_pieces_in_sector = runtime_api.max_pieces_in_sector(best_hash)?;
297 let block_authoring_delay = u64::from(chain_constants.block_authoring_delay());
298 let block_authoring_delay = usize::try_from(block_authoring_delay)
299 .expect("Block authoring delay will never exceed usize on any platform; qed");
300 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
301 .expect("Always a tiny constant in the protocol; qed");
302
303 Ok(Self {
304 client: config.client,
305 subscription_executor: config.subscription_executor,
306 new_slot_notification_stream: config.new_slot_notification_stream,
307 reward_signing_notification_stream: config.reward_signing_notification_stream,
308 object_mapping_notification_stream: config.object_mapping_notification_stream,
309 archived_segment_notification_stream: config.archived_segment_notification_stream,
310 solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new(
311 solution_response_senders_capacity,
312 )))),
313 reward_signature_senders: Arc::default(),
314 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
315 segment_headers_store: config.segment_headers_store,
316 cached_archived_segment: Arc::default(),
317 archived_segment_acknowledgement_senders: Arc::default(),
318 next_subscription_id: AtomicU64::default(),
319 sync_oracle: config.sync_oracle,
320 genesis_hash,
321 chain_constants,
322 max_pieces_in_sector,
323 kzg: config.kzg,
324 erasure_coding: config.erasure_coding,
325 _block: PhantomData,
326 })
327 }
328}
329
330#[async_trait]
331impl<Block, Client, SO, AS> SubspaceRpcApiServer for SubspaceRpc<Block, Client, SO, AS>
332where
333 Block: BlockT,
334 Client: ProvideRuntimeApi<Block>
335 + HeaderBackend<Block>
336 + BlockBackend<Block>
337 + Send
338 + Sync
339 + 'static,
340 Client::Api: ObjectsApi<Block>,
341 SO: SyncOracle + Send + Sync + Clone + 'static,
342 AS: AuxStore + Send + Sync + 'static,
343{
344 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
345 let last_segment_index = self
346 .segment_headers_store
347 .max_segment_index()
348 .unwrap_or(SegmentIndex::ZERO);
349
350 let farmer_app_info: Result<FarmerAppInfo, ApiError> = try {
351 let chain_constants = &self.chain_constants;
352 let protocol_info = FarmerProtocolInfo {
353 history_size: HistorySize::from(last_segment_index),
354 max_pieces_in_sector: self.max_pieces_in_sector,
355 recent_segments: chain_constants.recent_segments(),
356 recent_history_fraction: chain_constants.recent_history_fraction(),
357 min_sector_lifetime: chain_constants.min_sector_lifetime(),
358 };
359
360 FarmerAppInfo {
361 genesis_hash: self.genesis_hash,
362 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
363 syncing: self.sync_oracle.is_major_syncing(),
364 farming_timeout: chain_constants
365 .slot_duration()
366 .as_duration()
367 .mul_f64(SlotNumber::from(chain_constants.block_authoring_delay()) as f64),
368 protocol_info,
369 }
370 };
371
372 farmer_app_info.map_err(|error| {
373 error!("Failed to get data from runtime API: {}", error);
374 Error::StringError("Internal error".to_string())
375 })
376 }
377
378 fn submit_solution_response(
379 &self,
380 ext: &Extensions,
381 solution_response: SolutionResponse,
382 ) -> Result<(), Error> {
383 check_if_safe(ext)?;
384
385 let slot = solution_response.slot_number;
386 let mut solution_response_senders = self.solution_response_senders.lock();
387
388 let success = solution_response_senders
389 .peek_mut(&slot)
390 .and_then(|sender| sender.try_send(solution_response.solution).ok())
391 .is_some();
392
393 if !success {
394 warn!(
395 %slot,
396 "Solution was ignored, likely because farmer was too slow"
397 );
398
399 return Err(Error::StringError("Solution was ignored".to_string()));
400 }
401
402 Ok(())
403 }
404
405 fn subscribe_slot_info(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
406 let executor = self.subscription_executor.clone();
407 let solution_response_senders = self.solution_response_senders.clone();
408 let allow_solutions = check_if_safe(ext).is_ok();
409
410 let handle_slot_notification = move |new_slot_notification| {
411 let NewSlotNotification {
412 new_slot_info,
413 mut solution_sender,
414 } = new_slot_notification;
415
416 let slot_number = SlotNumber::from(new_slot_info.slot);
417
418 if allow_solutions {
420 let mut solution_response_senders = solution_response_senders.lock();
423 if solution_response_senders.peek(&slot_number).is_none() {
424 let (response_sender, mut response_receiver) =
425 mpsc::channel(SOLUTION_SENDER_CHANNEL_CAPACITY);
426
427 solution_response_senders.insert(slot_number, response_sender);
428
429 let forward_solution_fut = async move {
432 while let Some(solution) = response_receiver.next().await {
433 let public_key = solution.public_key;
434 let sector_index = solution.sector_index;
435
436 let solution = Solution {
437 public_key,
438 reward_address: solution.reward_address,
439 sector_index,
440 history_size: solution.history_size,
441 piece_offset: solution.piece_offset,
442 record_commitment: solution.record_commitment,
443 record_witness: solution.record_witness,
444 chunk: solution.chunk,
445 chunk_witness: solution.chunk_witness,
446 proof_of_space: solution.proof_of_space,
447 };
448
449 if solution_sender.try_send(solution).is_err() {
450 warn!(
451 slot = %slot_number,
452 %sector_index,
453 %public_key,
454 "Solution receiver is closed, likely because farmer was too slow"
455 );
456 }
457 }
458 };
459
460 executor.spawn(
461 "subspace-slot-info-forward",
462 Some("rpc"),
463 Box::pin(forward_solution_fut),
464 );
465 }
466 }
467
468 let global_challenge = new_slot_info
469 .proof_of_time
470 .derive_global_randomness()
471 .derive_global_challenge(slot_number);
472
473 SlotInfo {
475 slot_number,
476 global_challenge,
477 solution_range: new_slot_info.solution_range,
478 voting_solution_range: new_slot_info.voting_solution_range,
479 }
480 };
481 let stream = self
482 .new_slot_notification_stream
483 .subscribe()
484 .map(handle_slot_notification);
485
486 self.subscription_executor.spawn(
487 "subspace-slot-info-subscription",
488 Some("rpc"),
489 PendingSubscription::from(pending)
490 .pipe_from_stream(stream, BoundedVecDeque::default())
491 .boxed(),
492 );
493 }
494
495 fn subscribe_reward_signing(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
496 if check_if_safe(ext).is_err() {
497 debug!("Unsafe subscribe_reward_signing ignored");
498 return;
499 }
500
501 let executor = self.subscription_executor.clone();
502 let reward_signature_senders = self.reward_signature_senders.clone();
503
504 let stream = self.reward_signing_notification_stream.subscribe().map(
505 move |reward_signing_notification| {
506 let RewardSigningNotification {
507 hash,
508 public_key,
509 signature_sender,
510 } = reward_signing_notification;
511
512 let (response_sender, response_receiver) = async_oneshot::oneshot();
513
514 {
517 let mut reward_signature_senders = reward_signature_senders.lock();
518
519 if reward_signature_senders.current_hash != hash {
520 reward_signature_senders.current_hash = hash;
521 reward_signature_senders.senders.clear();
522 }
523
524 reward_signature_senders.senders.push(response_sender);
525 }
526
527 let forward_signature_fut = async move {
530 if let Ok(reward_signature) = response_receiver.await {
531 if let Some(signature) = reward_signature.signature {
532 let _ = signature_sender.unbounded_send(signature);
533 }
534 }
535 };
536
537 executor.spawn(
539 "subspace-block-signing-forward",
540 Some("rpc"),
541 future::select(
542 futures_timer::Delay::new(REWARD_SIGNING_TIMEOUT),
543 Box::pin(forward_signature_fut),
544 )
545 .map(|_| ())
546 .boxed(),
547 );
548
549 RewardSigningInfo {
551 hash: hash.into(),
552 public_key,
553 }
554 },
555 );
556
557 self.subscription_executor.spawn(
558 "subspace-block-signing-subscription",
559 Some("rpc"),
560 PendingSubscription::from(pending)
561 .pipe_from_stream(stream, BoundedVecDeque::default())
562 .boxed(),
563 );
564 }
565
566 fn submit_reward_signature(
567 &self,
568 ext: &Extensions,
569 reward_signature: RewardSignatureResponse,
570 ) -> Result<(), Error> {
571 check_if_safe(ext)?;
572
573 let reward_signature_senders = self.reward_signature_senders.clone();
574
575 let mut reward_signature_senders = reward_signature_senders.lock();
578
579 if reward_signature_senders.current_hash == reward_signature.hash.into() {
580 if let Some(mut sender) = reward_signature_senders.senders.pop() {
581 let _ = sender.send(reward_signature);
582 }
583 }
584
585 Ok(())
586 }
587
588 fn subscribe_archived_segment_header(
589 &self,
590 pending: PendingSubscriptionSink,
591 ext: &Extensions,
592 ) {
593 let archived_segment_acknowledgement_senders =
594 self.archived_segment_acknowledgement_senders.clone();
595
596 let cached_archived_segment = Arc::clone(&self.cached_archived_segment);
597 let subscription_id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
598 let allow_acknowledgements = check_if_safe(ext).is_ok();
599
600 let stream = self
601 .archived_segment_notification_stream
602 .subscribe()
603 .filter_map(move |archived_segment_notification| {
604 let ArchivedSegmentNotification {
605 archived_segment,
606 acknowledgement_sender,
607 } = archived_segment_notification;
608
609 let segment_index = archived_segment.segment_header.segment_index();
610
611 let maybe_archived_segment_header = if allow_acknowledgements {
614 let mut archived_segment_acknowledgement_senders =
615 archived_segment_acknowledgement_senders.lock();
616
617 if archived_segment_acknowledgement_senders.segment_index != segment_index {
618 archived_segment_acknowledgement_senders.segment_index = segment_index;
619 archived_segment_acknowledgement_senders.senders.clear();
620 }
621
622 let maybe_archived_segment_header =
623 match archived_segment_acknowledgement_senders
624 .senders
625 .entry(subscription_id)
626 {
627 Entry::Occupied(_) => {
628 None
630 }
631 Entry::Vacant(entry) => {
632 entry.insert(acknowledgement_sender);
633
634 Some(archived_segment.segment_header)
636 }
637 };
638
639 cached_archived_segment
640 .lock()
641 .replace(CachedArchivedSegment::Weak(Arc::downgrade(
642 &archived_segment,
643 )));
644
645 maybe_archived_segment_header
646 } else {
647 Some(archived_segment.segment_header)
650 };
651
652 Box::pin(async move { maybe_archived_segment_header })
653 });
654
655 let archived_segment_acknowledgement_senders =
656 self.archived_segment_acknowledgement_senders.clone();
657 let fut = async move {
658 PendingSubscription::from(pending)
659 .pipe_from_stream(stream, BoundedVecDeque::default())
660 .await;
661
662 let mut archived_segment_acknowledgement_senders =
663 archived_segment_acknowledgement_senders.lock();
664
665 archived_segment_acknowledgement_senders
666 .senders
667 .remove(&subscription_id);
668 };
669
670 self.subscription_executor.spawn(
671 "subspace-archived-segment-header-subscription",
672 Some("rpc"),
673 fut.boxed(),
674 );
675 }
676
677 async fn acknowledge_archived_segment_header(
678 &self,
679 ext: &Extensions,
680 segment_index: SegmentIndex,
681 ) -> Result<(), Error> {
682 check_if_safe(ext)?;
683
684 let archived_segment_acknowledgement_senders =
685 self.archived_segment_acknowledgement_senders.clone();
686
687 let maybe_sender = {
688 let mut archived_segment_acknowledgement_senders_guard =
689 archived_segment_acknowledgement_senders.lock();
690
691 (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
692 .then(|| {
693 let last_key = *archived_segment_acknowledgement_senders_guard
694 .senders
695 .keys()
696 .next()?;
697
698 archived_segment_acknowledgement_senders_guard
699 .senders
700 .remove(&last_key)
701 })
702 .flatten()
703 };
704
705 if let Some(sender) = maybe_sender {
706 if let Err(error) = sender.unbounded_send(()) {
707 if !error.is_closed() {
708 warn!("Failed to acknowledge archived segment: {error}");
709 }
710 }
711 }
712
713 debug!(%segment_index, "Acknowledged archived segment.");
714
715 Ok(())
716 }
717
718 fn piece(
720 &self,
721 ext: &Extensions,
722 requested_piece_index: PieceIndex,
723 ) -> Result<Option<Piece>, Error> {
724 check_if_safe(ext)?;
725
726 let archived_segment = {
727 let mut cached_archived_segment = self.cached_archived_segment.lock();
728
729 match cached_archived_segment
730 .as_ref()
731 .and_then(CachedArchivedSegment::get)
732 {
733 Some(archived_segment) => archived_segment,
734 None => {
735 if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
736 return Ok(None);
737 }
738
739 debug!(%requested_piece_index, "Re-creating genesis segment on demand");
740
741 match recreate_genesis_segment(
743 &*self.client,
744 self.kzg.clone(),
745 self.erasure_coding.clone(),
746 ) {
747 Ok(Some(archived_segment)) => {
748 let archived_segment = Arc::new(archived_segment);
749 cached_archived_segment.replace(CachedArchivedSegment::Genesis(
750 Arc::clone(&archived_segment),
751 ));
752 archived_segment
753 }
754 Ok(None) => {
755 return Ok(None);
756 }
757 Err(error) => {
758 error!(%error, "Failed to re-create genesis segment");
759
760 return Err(Error::StringError(
761 "Failed to re-create genesis segment".to_string(),
762 ));
763 }
764 }
765 }
766 }
767 };
768
769 if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
770 {
771 return Ok(archived_segment
772 .pieces
773 .pieces()
774 .nth(requested_piece_index.position() as usize));
775 }
776
777 Ok(None)
778 }
779
780 async fn segment_headers(
781 &self,
782 segment_indexes: Vec<SegmentIndex>,
783 ) -> Result<Vec<Option<SegmentHeader>>, Error> {
784 if segment_indexes.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
785 error!(
786 "segment_indexes length exceed the limit: {} ",
787 segment_indexes.len()
788 );
789
790 return Err(Error::StringError(format!(
791 "segment_indexes length exceed the limit {MAX_SEGMENT_HEADERS_PER_REQUEST}"
792 )));
793 };
794
795 Ok(segment_indexes
796 .into_iter()
797 .map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
798 .collect())
799 }
800
801 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
802 if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
803 error!(
804 "Request limit ({}) exceed the server limit: {} ",
805 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
806 );
807
808 return Err(Error::StringError(format!(
809 "Request limit ({}) exceed the server limit: {} ",
810 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
811 )));
812 };
813
814 let last_segment_index = self
815 .segment_headers_store
816 .max_segment_index()
817 .unwrap_or(SegmentIndex::ZERO);
818
819 let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
820 .rev()
821 .take(limit as usize)
822 .map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
823 .collect::<Vec<_>>();
824
825 last_segment_headers.reverse();
826
827 Ok(last_segment_headers)
828 }
829
830 fn subscribe_object_mappings(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
831 if check_if_safe(ext).is_err() {
832 debug!("Unsafe subscribe_object_mappings ignored");
833 return;
834 }
835
836 let mapping_stream = self
837 .object_mapping_notification_stream
838 .subscribe()
839 .flat_map(|object_mapping_notification| {
840 let objects = object_mapping_notification.object_mapping;
841 let block_number = object_mapping_notification.block_number;
842
843 stream::iter(objects)
844 .ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
845 .map(move |chunk| ObjectMappingResponse {
846 block_number,
847 objects: GlobalObjectMapping::from_objects(chunk.iter().cloned()),
848 })
849 });
850
851 self.subscription_executor.spawn(
852 "subspace-archived-object-mappings-subscription",
853 Some("rpc"),
854 PendingSubscription::from(pending)
855 .pipe_from_stream(mapping_stream, BoundedVecDeque::default())
856 .boxed(),
857 );
858 }
859
860 fn subscribe_filtered_object_mappings(
861 &self,
862 pending: PendingSubscriptionSink,
863 ext: &Extensions,
864 hashes: Vec<Blake3Hash>,
865 ) {
866 if check_if_safe(ext).is_err() {
867 debug!("Unsafe subscribe_filtered_object_mappings ignored");
868 return;
869 }
870
871 if hashes.len() > MAX_OBJECT_HASHES_PER_SUBSCRIPTION {
872 error!(
873 "Request hash count ({}) exceed the server limit: {} ",
874 hashes.len(),
875 MAX_OBJECT_HASHES_PER_SUBSCRIPTION
876 );
877
878 let err_fut = pending.reject(Error::StringError(format!(
879 "Request hash count ({}) exceed the server limit: {} ",
880 hashes.len(),
881 MAX_OBJECT_HASHES_PER_SUBSCRIPTION
882 )));
883
884 self.subscription_executor.spawn(
885 "subspace-filtered-object-mappings-subscription",
886 Some("rpc"),
887 err_fut.boxed(),
888 );
889
890 return;
891 };
892
893 let mut hashes = HashSet::<Blake3Hash>::from_iter(hashes);
894 let hash_count = hashes.len();
895 let mut object_count = 0;
896
897 let mapping_stream = self
898 .object_mapping_notification_stream
899 .subscribe()
900 .flat_map(move |object_mapping_notification| {
901 let objects = object_mapping_notification.object_mapping;
902 let block_number = object_mapping_notification.block_number;
903
904 let filtered_objects = objects
905 .into_iter()
906 .filter(|object| hashes.remove(&object.hash))
907 .collect::<Vec<_>>();
908
909 stream::iter(filtered_objects)
910 .ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
913 .map(move |chunk| ObjectMappingResponse {
914 block_number,
915 objects: GlobalObjectMapping::from_objects(chunk.iter().cloned()),
916 })
917 })
918 .take_while(move |mappings| {
921 object_count += mappings.objects.objects().len();
922 future::ready(object_count <= hash_count)
923 });
924
925 self.subscription_executor.spawn(
926 "subspace-filtered-object-mappings-subscription",
927 Some("rpc"),
928 PendingSubscription::from(pending)
929 .pipe_from_stream(mapping_stream, BoundedVecDeque::default())
930 .boxed(),
931 );
932 }
933}