1#![feature(
62 array_windows,
63 assert_matches,
64 box_into_inner,
65 duration_constructors_lite,
66 more_qualified_paths
67)]
68
69mod aux_schema;
70mod bundle_processor;
71mod bundle_producer_election_solver;
72mod domain_block_processor;
73pub mod domain_bundle_producer;
74pub mod domain_bundle_proposer;
75mod domain_worker;
76mod fetch_domain_bootstrap_info;
77mod fraud_proof;
78mod operator;
79pub mod snap_sync;
80#[cfg(test)]
81mod tests;
82mod utils;
83
84pub use self::aux_schema::load_execution_receipt;
85pub use self::fetch_domain_bootstrap_info::{BootstrapResult, fetch_domain_bootstrap_info};
86pub use self::operator::Operator;
87pub use self::utils::{DomainBlockImportNotification, OperatorSlotInfo};
88pub use domain_worker::OpaqueBundleFor;
89use futures::Stream;
90use futures::channel::mpsc;
91use sc_client_api::{AuxStore, BlockImportNotification};
92use sc_consensus::BoxBlockImport;
93use sc_network::service::traits::NetworkService;
94use sc_network_sync::SyncingService;
95use sc_network_sync::block_relay_protocol::BlockDownloader;
96use sc_network_sync::service::network::NetworkServiceHandle;
97use sc_transaction_pool_api::OffchainTransactionPoolFactory;
98use sc_utils::mpsc::TracingUnboundedSender;
99use snap_sync::ConsensusChainSyncParams;
100use sp_blockchain::HeaderBackend;
101use sp_consensus::SyncOracle;
102use sp_consensus_slots::Slot;
103use sp_domain_digests::AsPredigest;
104use sp_domains::bundle::Bundle;
105use sp_domains::execution_receipt::ExecutionReceiptFor as ExecutionReceipt;
106use sp_domains::{DomainId, OperatorId};
107use sp_keystore::KeystorePtr;
108use sp_runtime::DigestItem;
109use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
110use std::marker::PhantomData;
111use std::sync::Arc;
112use std::sync::atomic::{AtomicBool, Ordering};
113use subspace_core_primitives::pot::PotOutput;
114use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
115
116#[derive(Debug, Clone)]
121pub struct DomainChainSyncOracle<SO>
122where
123 SO: SyncOracle + Send + Sync,
124{
125 domain_snap_sync_finished: Option<Arc<AtomicBool>>,
126 inner: SO,
127}
128
129impl<SO> SyncOracle for DomainChainSyncOracle<SO>
130where
131 SO: SyncOracle + Send + Sync,
132{
133 fn is_major_syncing(&self) -> bool {
134 self.inner.is_major_syncing()
135 || self
136 .domain_snap_sync_finished
137 .as_ref()
138 .map(|sync_finished| !sync_finished.load(Ordering::Acquire))
139 .unwrap_or_default()
140 }
141
142 fn is_offline(&self) -> bool {
143 self.inner.is_offline()
144 }
145}
146
147impl<SO> DomainChainSyncOracle<SO>
148where
149 SO: SyncOracle + Send + Sync,
150{
151 pub fn new(sync_oracle: SO, domain_snap_sync_finished: Option<Arc<AtomicBool>>) -> Self {
153 Self {
154 domain_snap_sync_finished,
155 inner: sync_oracle,
156 }
157 }
158}
159
160pub type ExecutionReceiptFor<Block, CBlock> = ExecutionReceipt<HeaderFor<Block>, CBlock, Balance>;
161
162type BundleSender<Block, CBlock> = TracingUnboundedSender<
163 Bundle<ExtrinsicFor<Block>, NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>,
164>;
165
166pub struct OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS> {
168 pub consensus_block_import_throttling_buffer_size: u32,
171 pub block_importing_notification_stream: IBNS,
175 pub imported_block_notification_stream: CIBNS,
179 pub new_slot_notification_stream: NSNS,
181 pub acknowledgement_sender_stream: ASS,
184 pub _phantom: PhantomData<CBlock>,
185}
186
187type NewSlotNotification = (Slot, PotOutput);
188
189pub struct OperatorParams<
190 Block,
191 CBlock,
192 Client,
193 CClient,
194 TransactionPool,
195 Backend,
196 E,
197 IBNS,
198 CIBNS,
199 NSNS,
200 ASS,
201> where
202 Block: BlockT,
203 CBlock: BlockT,
204 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
205 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
206 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
207 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
208{
209 pub domain_id: DomainId,
210 pub domain_created_at: NumberFor<CBlock>,
211 pub consensus_client: Arc<CClient>,
212 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
213 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
214 pub client: Arc<Client>,
215 pub transaction_pool: Arc<TransactionPool>,
216 pub backend: Arc<Backend>,
217 pub code_executor: Arc<E>,
218 pub maybe_operator_id: Option<OperatorId>,
219 pub keystore: KeystorePtr,
220 pub bundle_sender: Arc<BundleSender<Block, CBlock>>,
221 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
222 pub consensus_confirmation_depth_k: NumberFor<CBlock>,
223 pub challenge_period: NumberFor<CBlock>,
224 pub block_import: Arc<BoxBlockImport<Block>>,
225 pub skip_empty_bundle_production: bool,
226 pub skip_out_of_order_slot: bool,
227 pub sync_service: Arc<SyncingService<Block>>,
228 pub network_service: Arc<dyn NetworkService>,
229 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
230 pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, Block::Header>>,
231 pub domain_fork_id: Option<String>,
232 pub domain_network_service_handle: NetworkServiceHandle,
233}
234
235pub fn load_execution_receipt_by_domain_hash<Block, CBlock, Client>(
236 domain_client: &Client,
237 domain_hash: Block::Hash,
238 domain_number: NumberFor<Block>,
239) -> Result<ExecutionReceiptFor<Block, CBlock>, sp_blockchain::Error>
240where
241 Block: BlockT,
242 CBlock: BlockT,
243 Client: AuxStore + HeaderBackend<Block>,
244{
245 let domain_header = domain_client.header(domain_hash)?.ok_or_else(|| {
246 sp_blockchain::Error::Backend(format!(
247 "Header for domain block {domain_hash}#{domain_number} not found"
248 ))
249 })?;
250
251 let consensus_block_hash = domain_header
252 .digest()
253 .convert_first(DigestItem::as_consensus_block_info)
254 .ok_or_else(|| {
255 sp_blockchain::Error::Application(format!(
256 "Domain block header {domain_hash}#{domain_number} must have consensus block info predigest"
257 ).into())
258 })?;
259
260 load_execution_receipt::<_, Block, CBlock>(domain_client, consensus_block_hash)?.ok_or_else(
262 || {
263 sp_blockchain::Error::Backend(format!(
264 "Receipt for consensus block {consensus_block_hash} and domain block \
265 {domain_hash}#{domain_number} not found"
266 ))
267 },
268 )
269}