1#![feature(
62 array_windows,
63 assert_matches,
64 box_into_inner,
65 duration_constructors_lite,
66 more_qualified_paths
67)]
68
69mod aux_schema;
70mod bundle_processor;
71mod bundle_producer_election_solver;
72mod domain_block_processor;
73pub mod domain_bundle_producer;
74pub mod domain_bundle_proposer;
75mod domain_worker;
76mod fetch_domain_bootstrap_info;
77mod fraud_proof;
78mod operator;
79pub mod snap_sync;
80#[cfg(test)]
81mod tests;
82mod utils;
83
84pub use self::aux_schema::load_execution_receipt;
85pub use self::fetch_domain_bootstrap_info::{BootstrapResult, fetch_domain_bootstrap_info};
86pub use self::operator::Operator;
87pub use self::utils::{DomainBlockImportNotification, OperatorSlotInfo};
88pub use domain_worker::OpaqueBundleFor;
89use futures::Stream;
90use futures::channel::mpsc;
91use sc_client_api::{AuxStore, BlockImportNotification};
92use sc_consensus::BoxBlockImport;
93use sc_network::service::traits::NetworkService;
94use sc_network_sync::SyncingService;
95use sc_network_sync::block_relay_protocol::BlockDownloader;
96use sc_network_sync::service::network::NetworkServiceHandle;
97use sc_transaction_pool_api::OffchainTransactionPoolFactory;
98use sc_utils::mpsc::TracingUnboundedSender;
99use snap_sync::ConsensusChainSyncParams;
100use sp_blockchain::HeaderBackend;
101use sp_consensus::SyncOracle;
102use sp_consensus_slots::Slot;
103use sp_domain_digests::AsPredigest;
104use sp_domains::{Bundle, DomainId, ExecutionReceiptFor as ExecutionReceipt, OperatorId};
105use sp_keystore::KeystorePtr;
106use sp_runtime::DigestItem;
107use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
108use std::marker::PhantomData;
109use std::sync::Arc;
110use std::sync::atomic::{AtomicBool, Ordering};
111use subspace_core_primitives::pot::PotOutput;
112use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
113
114#[derive(Debug, Clone)]
119pub struct DomainChainSyncOracle<SO>
120where
121 SO: SyncOracle + Send + Sync,
122{
123 domain_snap_sync_finished: Option<Arc<AtomicBool>>,
124 inner: SO,
125}
126
127impl<SO> SyncOracle for DomainChainSyncOracle<SO>
128where
129 SO: SyncOracle + Send + Sync,
130{
131 fn is_major_syncing(&self) -> bool {
132 self.inner.is_major_syncing()
133 || self
134 .domain_snap_sync_finished
135 .as_ref()
136 .map(|sync_finished| !sync_finished.load(Ordering::Acquire))
137 .unwrap_or_default()
138 }
139
140 fn is_offline(&self) -> bool {
141 self.inner.is_offline()
142 }
143}
144
145impl<SO> DomainChainSyncOracle<SO>
146where
147 SO: SyncOracle + Send + Sync,
148{
149 pub fn new(sync_oracle: SO, domain_snap_sync_finished: Option<Arc<AtomicBool>>) -> Self {
151 Self {
152 domain_snap_sync_finished,
153 inner: sync_oracle,
154 }
155 }
156}
157
158pub type ExecutionReceiptFor<Block, CBlock> = ExecutionReceipt<HeaderFor<Block>, CBlock, Balance>;
159
160type BundleSender<Block, CBlock> = TracingUnboundedSender<
161 Bundle<ExtrinsicFor<Block>, NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>,
162>;
163
164pub struct OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS> {
166 pub consensus_block_import_throttling_buffer_size: u32,
169 pub block_importing_notification_stream: IBNS,
173 pub imported_block_notification_stream: CIBNS,
177 pub new_slot_notification_stream: NSNS,
179 pub acknowledgement_sender_stream: ASS,
182 pub _phantom: PhantomData<CBlock>,
183}
184
185type NewSlotNotification = (Slot, PotOutput);
186
187pub struct OperatorParams<
188 Block,
189 CBlock,
190 Client,
191 CClient,
192 TransactionPool,
193 Backend,
194 E,
195 IBNS,
196 CIBNS,
197 NSNS,
198 ASS,
199> where
200 Block: BlockT,
201 CBlock: BlockT,
202 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
203 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
204 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
205 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
206{
207 pub domain_id: DomainId,
208 pub domain_created_at: NumberFor<CBlock>,
209 pub consensus_client: Arc<CClient>,
210 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
211 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
212 pub client: Arc<Client>,
213 pub transaction_pool: Arc<TransactionPool>,
214 pub backend: Arc<Backend>,
215 pub code_executor: Arc<E>,
216 pub maybe_operator_id: Option<OperatorId>,
217 pub keystore: KeystorePtr,
218 pub bundle_sender: Arc<BundleSender<Block, CBlock>>,
219 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
220 pub consensus_confirmation_depth_k: NumberFor<CBlock>,
221 pub challenge_period: NumberFor<CBlock>,
222 pub block_import: Arc<BoxBlockImport<Block>>,
223 pub skip_empty_bundle_production: bool,
224 pub skip_out_of_order_slot: bool,
225 pub sync_service: Arc<SyncingService<Block>>,
226 pub network_service: Arc<dyn NetworkService>,
227 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
228 pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, Block::Header>>,
229 pub domain_fork_id: Option<String>,
230 pub domain_network_service_handle: NetworkServiceHandle,
231}
232
233pub fn load_execution_receipt_by_domain_hash<Block, CBlock, Client>(
234 domain_client: &Client,
235 domain_hash: Block::Hash,
236 domain_number: NumberFor<Block>,
237) -> Result<ExecutionReceiptFor<Block, CBlock>, sp_blockchain::Error>
238where
239 Block: BlockT,
240 CBlock: BlockT,
241 Client: AuxStore + HeaderBackend<Block>,
242{
243 let domain_header = domain_client.header(domain_hash)?.ok_or_else(|| {
244 sp_blockchain::Error::Backend(format!(
245 "Header for domain block {domain_hash}#{domain_number} not found"
246 ))
247 })?;
248
249 let consensus_block_hash = domain_header
250 .digest()
251 .convert_first(DigestItem::as_consensus_block_info)
252 .ok_or_else(|| {
253 sp_blockchain::Error::Application(format!(
254 "Domain block header {domain_hash}#{domain_number} must have consensus block info predigest"
255 ).into())
256 })?;
257
258 crate::aux_schema::load_execution_receipt::<_, Block, CBlock>(
260 domain_client,
261 consensus_block_hash,
262 )?
263 .ok_or_else(|| {
264 sp_blockchain::Error::Backend(format!(
265 "Receipt for consensus block {consensus_block_hash} and domain block \
266 {domain_hash}#{domain_number} not found"
267 ))
268 })
269}