1#![feature(array_windows, assert_matches, box_into_inner, more_qualified_paths)]
62
63mod aux_schema;
64mod bundle_processor;
65mod bundle_producer_election_solver;
66mod domain_block_processor;
67pub mod domain_bundle_producer;
68pub mod domain_bundle_proposer;
69mod domain_worker;
70mod fetch_domain_bootstrap_info;
71mod fraud_proof;
72mod operator;
73pub mod snap_sync;
74#[cfg(test)]
75mod tests;
76mod utils;
77
78pub use self::aux_schema::load_execution_receipt;
79pub use self::fetch_domain_bootstrap_info::{BootstrapResult, fetch_domain_bootstrap_info};
80pub use self::operator::Operator;
81pub use self::utils::{DomainBlockImportNotification, OperatorSlotInfo};
82pub use domain_worker::OpaqueBundleFor;
83use futures::Stream;
84use futures::channel::mpsc;
85use sc_client_api::{AuxStore, BlockImportNotification};
86use sc_consensus::BoxBlockImport;
87use sc_network::service::traits::NetworkService;
88use sc_network_sync::SyncingService;
89use sc_network_sync::block_relay_protocol::BlockDownloader;
90use sc_network_sync::service::network::NetworkServiceHandle;
91use sc_transaction_pool_api::OffchainTransactionPoolFactory;
92use sc_utils::mpsc::TracingUnboundedSender;
93use snap_sync::ConsensusChainSyncParams;
94use sp_blockchain::HeaderBackend;
95use sp_consensus::SyncOracle;
96use sp_consensus_slots::Slot;
97use sp_domain_digests::AsPredigest;
98use sp_domains::bundle::Bundle;
99use sp_domains::execution_receipt::ExecutionReceiptFor as ExecutionReceipt;
100use sp_domains::{DomainId, OperatorId};
101use sp_keystore::KeystorePtr;
102use sp_runtime::DigestItem;
103use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
104use std::marker::PhantomData;
105use std::sync::Arc;
106use std::sync::atomic::{AtomicBool, Ordering};
107use subspace_core_primitives::pot::PotOutput;
108use subspace_runtime_primitives::{Balance, BlockHashFor, ExtrinsicFor, HeaderFor};
109
110#[derive(Debug, Clone)]
115pub struct DomainChainSyncOracle<SO>
116where
117 SO: SyncOracle + Send + Sync,
118{
119 domain_snap_sync_finished: Option<Arc<AtomicBool>>,
120 inner: SO,
121}
122
123impl<SO> SyncOracle for DomainChainSyncOracle<SO>
124where
125 SO: SyncOracle + Send + Sync,
126{
127 fn is_major_syncing(&self) -> bool {
128 self.inner.is_major_syncing()
129 || self
130 .domain_snap_sync_finished
131 .as_ref()
132 .map(|sync_finished| !sync_finished.load(Ordering::Acquire))
133 .unwrap_or_default()
134 }
135
136 fn is_offline(&self) -> bool {
137 self.inner.is_offline()
138 }
139}
140
141impl<SO> DomainChainSyncOracle<SO>
142where
143 SO: SyncOracle + Send + Sync,
144{
145 pub fn new(sync_oracle: SO, domain_snap_sync_finished: Option<Arc<AtomicBool>>) -> Self {
147 Self {
148 domain_snap_sync_finished,
149 inner: sync_oracle,
150 }
151 }
152}
153
154pub type ExecutionReceiptFor<Block, CBlock> = ExecutionReceipt<HeaderFor<Block>, CBlock, Balance>;
155
156type BundleSender<Block, CBlock> = TracingUnboundedSender<
157 Bundle<ExtrinsicFor<Block>, NumberFor<CBlock>, BlockHashFor<CBlock>, HeaderFor<Block>, Balance>,
158>;
159
160pub struct OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS> {
162 pub consensus_block_import_throttling_buffer_size: u32,
165 pub block_importing_notification_stream: IBNS,
169 pub imported_block_notification_stream: CIBNS,
173 pub new_slot_notification_stream: NSNS,
175 pub acknowledgement_sender_stream: ASS,
178 pub _phantom: PhantomData<CBlock>,
179}
180
181type NewSlotNotification = (Slot, PotOutput);
182
183pub struct OperatorParams<
184 Block,
185 CBlock,
186 Client,
187 CClient,
188 TransactionPool,
189 Backend,
190 E,
191 IBNS,
192 CIBNS,
193 NSNS,
194 ASS,
195> where
196 Block: BlockT,
197 CBlock: BlockT,
198 IBNS: Stream<Item = (NumberFor<CBlock>, mpsc::Sender<()>)> + Send + 'static,
199 CIBNS: Stream<Item = BlockImportNotification<CBlock>> + Send + 'static,
200 NSNS: Stream<Item = NewSlotNotification> + Send + 'static,
201 ASS: Stream<Item = mpsc::Sender<()>> + Send + 'static,
202{
203 pub domain_id: DomainId,
204 pub domain_created_at: NumberFor<CBlock>,
205 pub consensus_client: Arc<CClient>,
206 pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
207 pub domain_sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
208 pub client: Arc<Client>,
209 pub transaction_pool: Arc<TransactionPool>,
210 pub backend: Arc<Backend>,
211 pub code_executor: Arc<E>,
212 pub maybe_operator_id: Option<OperatorId>,
213 pub keystore: KeystorePtr,
214 pub bundle_sender: Arc<BundleSender<Block, CBlock>>,
215 pub operator_streams: OperatorStreams<CBlock, IBNS, CIBNS, NSNS, ASS>,
216 pub consensus_confirmation_depth_k: NumberFor<CBlock>,
217 pub challenge_period: NumberFor<CBlock>,
218 pub block_import: Arc<BoxBlockImport<Block>>,
219 pub skip_empty_bundle_production: bool,
220 pub skip_out_of_order_slot: bool,
221 pub sync_service: Arc<SyncingService<Block>>,
222 pub network_service: Arc<dyn NetworkService>,
223 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
224 pub consensus_chain_sync_params: Option<ConsensusChainSyncParams<CBlock, Block::Header>>,
225 pub domain_fork_id: Option<String>,
226 pub domain_network_service_handle: NetworkServiceHandle,
227}
228
229pub fn load_execution_receipt_by_domain_hash<Block, CBlock, Client>(
230 domain_client: &Client,
231 domain_hash: Block::Hash,
232 domain_number: NumberFor<Block>,
233) -> Result<ExecutionReceiptFor<Block, CBlock>, sp_blockchain::Error>
234where
235 Block: BlockT,
236 CBlock: BlockT,
237 Client: AuxStore + HeaderBackend<Block>,
238{
239 let domain_header = domain_client.header(domain_hash)?.ok_or_else(|| {
240 sp_blockchain::Error::Backend(format!(
241 "Header for domain block {domain_hash}#{domain_number} not found"
242 ))
243 })?;
244
245 let consensus_block_hash = domain_header
246 .digest()
247 .convert_first(DigestItem::as_consensus_block_info)
248 .ok_or_else(|| {
249 sp_blockchain::Error::Application(format!(
250 "Domain block header {domain_hash}#{domain_number} must have consensus block info predigest"
251 ).into())
252 })?;
253
254 load_execution_receipt::<_, Block, CBlock>(domain_client, consensus_block_hash)?.ok_or_else(
256 || {
257 sp_blockchain::Error::Backend(format!(
258 "Receipt for consensus block {consensus_block_hash} and domain block \
259 {domain_hash}#{domain_number} not found"
260 ))
261 },
262 )
263}