1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
use crate::malicious_bundle_producer::MaliciousBundleProducer;
use crate::{create_malicious_operator_configuration, DomainCli};
use cross_domain_message_gossip::{ChainMsg, Message};
use domain_client_operator::{BootstrapResult, OperatorStreams};
use domain_eth_service::provider::EthProvider;
use domain_eth_service::DefaultEthConfig;
use domain_runtime_primitives::opaque::Block as DomainBlock;
use domain_service::providers::DefaultProvider;
use domain_service::{FullBackend, FullClient};
use evm_domain_runtime::AccountId as AccountId20;
use futures::StreamExt;
use sc_cli::CliConfiguration;
use sc_consensus_subspace::block_import::BlockImportingNotification;
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::slot_worker::NewSlotNotification;
use sc_network::NetworkPeers;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::SubspaceApi;
use sp_core::crypto::AccountId32;
use sp_core::traits::SpawnEssentialNamed;
use sp_domains::{DomainInstanceData, RuntimeType};
use sp_keystore::KeystorePtr;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_runtime::RuntimeApi as CRuntimeApi;
use subspace_runtime_primitives::opaque::Block as CBlock;
use subspace_runtime_primitives::AccountId;
use subspace_service::FullClient as CFullClient;

/// `DomainInstanceStarter` used to start a domain instance node based on the given
/// bootstrap result
pub struct DomainInstanceStarter {
    pub domain_cli: DomainCli,
    pub base_path: PathBuf,
    pub tokio_handle: tokio::runtime::Handle,
    pub consensus_client: Arc<CFullClient<CRuntimeApi>>,
    pub consensus_keystore: KeystorePtr,
    pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory<CBlock>,
    pub block_importing_notification_stream:
        SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
    pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
    pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
    pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
    pub gossip_message_sink: TracingUnboundedSender<Message>,
    pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
}

impl DomainInstanceStarter {
    pub async fn start(
        self,
        bootstrap_result: BootstrapResult<CBlock>,
        sudo_account: AccountId,
    ) -> Result<Arc<sc_domains::RuntimeExecutor>, Box<dyn std::error::Error>> {
        let BootstrapResult {
            domain_instance_data,
            domain_created_at,
            imported_block_notification_stream,
        } = bootstrap_result;

        let DomainInstanceData {
            runtime_type,
            raw_genesis,
        } = domain_instance_data;

        let DomainInstanceStarter {
            domain_cli,
            base_path,
            tokio_handle,
            consensus_client,
            consensus_keystore,
            consensus_offchain_tx_pool_factory,
            block_importing_notification_stream,
            new_slot_notification_stream,
            consensus_sync_service,
            domain_message_receiver,
            gossip_message_sink,
            consensus_network,
        } = self;

        let domain_id = domain_cli.domain_id.into();
        let domain_config = {
            let chain_id = domain_cli.run.chain_id(domain_cli.run.is_dev()?)?;
            let domain_spec =
                crate::chain_spec::create_domain_spec(chain_id.as_str(), raw_genesis)?;
            create_malicious_operator_configuration::<DomainCli>(
                domain_id,
                base_path.into(),
                &domain_cli,
                domain_spec,
                tokio_handle,
            )?
        };

        let block_importing_notification_stream = block_importing_notification_stream
            .subscribe()
            .then(|block_importing_notification| async move {
                (
                    block_importing_notification.block_number,
                    block_importing_notification.acknowledgement_sender,
                )
            });

        let new_slot_notification_stream = || {
            new_slot_notification_stream
                .subscribe()
                .then(|slot_notification| async move {
                    (
                        slot_notification.new_slot_info.slot,
                        slot_notification.new_slot_info.proof_of_time,
                    )
                })
        };

        let operator_streams = OperatorStreams {
            // TODO: proper value
            consensus_block_import_throttling_buffer_size: 10,
            block_importing_notification_stream,
            imported_block_notification_stream,
            new_slot_notification_stream: new_slot_notification_stream(),
            acknowledgement_sender_stream: futures::stream::empty(),
            _phantom: Default::default(),
        };

        let consensus_best_hash = consensus_client.info().best_hash;
        let chain_constants = consensus_client
            .runtime_api()
            .chain_constants(consensus_best_hash)?;

        match runtime_type {
            RuntimeType::Evm => {
                let evm_base_path = domain_config
                    .base_path
                    .config_dir(domain_config.chain_spec.id());

                let eth_provider =
                    EthProvider::<
                        evm_domain_runtime::TransactionConverter,
                        DefaultEthConfig<
                            FullClient<DomainBlock, evm_domain_runtime::RuntimeApi>,
                            FullBackend<DomainBlock>,
                        >,
                    >::new(Some(&evm_base_path), domain_cli.additional_args());

                let domain_params = domain_service::DomainParams {
                    domain_id,
                    domain_config,
                    domain_created_at,
                    consensus_client: consensus_client.clone(),
                    consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
                    consensus_network,
                    consensus_network_sync_oracle: consensus_sync_service.clone(),
                    operator_streams,
                    gossip_message_sink,
                    domain_message_receiver,
                    provider: eth_provider,
                    skip_empty_bundle_production: true,
                    skip_out_of_order_slot: false,
                    // Always set it to `None` to not running the normal bundle producer
                    maybe_operator_id: None,
                    confirmation_depth_k: chain_constants.confirmation_depth_k(),
                };

                let mut domain_node = domain_service::new_full::<
                    _,
                    _,
                    _,
                    _,
                    _,
                    _,
                    evm_domain_runtime::RuntimeApi,
                    AccountId20,
                    _,
                >(domain_params)
                .await?;

                let malicious_bundle_producer = MaliciousBundleProducer::new(
                    domain_id,
                    domain_node.client.clone(),
                    consensus_client,
                    consensus_keystore,
                    consensus_offchain_tx_pool_factory,
                    domain_node.transaction_pool.clone(),
                    sudo_account,
                );

                domain_node
                    .task_manager
                    .spawn_essential_handle()
                    .spawn_essential_blocking(
                        "malicious-bundle-producer",
                        None,
                        Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
                    );

                domain_node.network_starter.start_network();

                domain_node.task_manager.future().await?;

                Ok(domain_node.code_executor.clone())
            }
            RuntimeType::AutoId => {
                let domain_params = domain_service::DomainParams {
                    domain_id,
                    domain_config,
                    domain_created_at,
                    consensus_client: consensus_client.clone(),
                    consensus_offchain_tx_pool_factory: consensus_offchain_tx_pool_factory.clone(),
                    consensus_network,
                    consensus_network_sync_oracle: consensus_sync_service.clone(),
                    operator_streams,
                    gossip_message_sink,
                    domain_message_receiver,
                    provider: DefaultProvider,
                    skip_empty_bundle_production: true,
                    skip_out_of_order_slot: false,
                    // Always set it to `None` to not running the normal bundle producer
                    maybe_operator_id: None,
                    confirmation_depth_k: chain_constants.confirmation_depth_k(),
                };

                let mut domain_node = domain_service::new_full::<
                    _,
                    _,
                    _,
                    _,
                    _,
                    _,
                    auto_id_domain_runtime::RuntimeApi,
                    AccountId32,
                    _,
                >(domain_params)
                .await?;

                let malicious_bundle_producer = MaliciousBundleProducer::new(
                    domain_id,
                    domain_node.client.clone(),
                    consensus_client,
                    consensus_keystore,
                    consensus_offchain_tx_pool_factory,
                    domain_node.transaction_pool.clone(),
                    sudo_account,
                );

                domain_node
                    .task_manager
                    .spawn_essential_handle()
                    .spawn_essential_blocking(
                        "malicious-bundle-producer",
                        None,
                        Box::pin(malicious_bundle_producer.start(new_slot_notification_stream())),
                    );

                domain_node.network_starter.start_network();

                domain_node.task_manager.future().await?;

                Ok(domain_node.code_executor.clone())
            }
        }
    }
}