Skip to content

Commit 31957d4

Browse files
tomip01ilitteri
authored andcommitted
fix(l2): shutdown node with ctrl+c (#5208)
**Motivation** The L2 node is not stopping after pressing `ctrl+c`. This is because the abort of the `l2_sequencer` does not stop block task such as block producer and l1 committer **Description** - Add new `InMessage` to abort l1 committer and block producer tasks - Change `start_l2` to return handles of the blocking tasks --------- Co-authored-by: Ivan Litteri <67517699+ilitteri@users.noreply.github.com>
1 parent 265352c commit 31957d4

File tree

4 files changed

+125
-55
lines changed

4 files changed

+125
-55
lines changed

cmd/ethrex/l2/initializers.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use ethrex_blockchain::{Blockchain, BlockchainType, L2Config};
1111
use ethrex_common::fd_limit::raise_fd_limit;
1212
use ethrex_common::types::fee_config::{FeeConfig, L1FeeConfig, OperatorFeeConfig};
1313
use ethrex_common::{Address, types::DEFAULT_BUILDER_GAS_CEIL};
14+
use ethrex_l2::sequencer::block_producer;
15+
use ethrex_l2::sequencer::l1_committer;
1416
use ethrex_l2::sequencer::l1_committer::regenerate_head_state;
1517
use ethrex_p2p::{
1618
discv4::peer_table::PeerTable,
@@ -24,6 +26,7 @@ use ethrex_storage::Store;
2426
use ethrex_storage_rollup::{EngineTypeRollup, StoreRollup};
2527
use eyre::OptionExt;
2628
use secp256k1::SecretKey;
29+
use spawned_concurrency::tasks::GenServerHandle;
2730
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
2831
use tokio::task::JoinSet;
2932
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -133,14 +136,36 @@ pub fn init_tracing(opts: &L2Options) -> Option<reload::Handle<EnvFilter, Regist
133136
}
134137
}
135138

139+
async fn shutdown_sequencer_handles(
140+
committer_handle: Option<GenServerHandle<l1_committer::L1Committer>>,
141+
block_producer_handle: Option<GenServerHandle<block_producer::BlockProducer>>,
142+
) {
143+
// These GenServers run via start_blocking, so aborting the JoinSet alone never stops them.
144+
// Sending Abort elicits CastResponse::Stop and lets the blocking loop unwind cleanly.
145+
if let Some(mut handle) = committer_handle {
146+
handle
147+
.cast(l1_committer::InMessage::Abort)
148+
.await
149+
.inspect_err(|err| warn!("Failed to send committer abort: {err:?}"))
150+
.ok();
151+
}
152+
if let Some(mut handle) = block_producer_handle {
153+
handle
154+
.cast(block_producer::InMessage::Abort)
155+
.await
156+
.inspect_err(|err| warn!("Failed to send block producer abort: {err:?}"))
157+
.ok();
158+
}
159+
}
160+
136161
pub async fn init_l2(
137162
opts: L2Options,
138163
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
139164
) -> eyre::Result<()> {
140165
raise_fd_limit()?;
141-
142166
let datadir = opts.node_opts.datadir.clone();
143167
init_datadir(&opts.node_opts.datadir);
168+
144169
let rollup_store_dir = datadir.join("rollup_store");
145170

146171
// Checkpoints are stored in the main datadir
@@ -280,14 +305,12 @@ pub async fn init_l2(
280305
}
281306

282307
let sequencer_cancellation_token = CancellationToken::new();
283-
284308
let l2_url = Url::parse(&format!(
285309
"http://{}:{}",
286310
opts.node_opts.http_addr, opts.node_opts.http_port
287311
))
288312
.map_err(|err| eyre::eyre!("Failed to parse L2 RPC URL: {err}"))?;
289-
290-
let l2_sequencer = ethrex_l2::start_l2(
313+
let (committer_handle, block_producer_handle, l2_sequencer) = ethrex_l2::start_l2(
291314
store,
292315
rollup_store,
293316
blockchain,
@@ -297,15 +320,19 @@ pub async fn init_l2(
297320
genesis,
298321
checkpoints_dir,
299322
)
300-
.into_future();
301-
323+
.await?;
302324
join_set.spawn(l2_sequencer);
303325

304326
tokio::select! {
305327
_ = tokio::signal::ctrl_c() => {
328+
shutdown_sequencer_handles(
329+
committer_handle.clone(),
330+
block_producer_handle.clone()
331+
).await;
306332
join_set.abort_all();
307333
}
308334
_ = sequencer_cancellation_token.cancelled() => {
335+
shutdown_sequencer_handles(committer_handle.clone(), block_producer_handle.clone()).await;
309336
}
310337
}
311338
info!("Server shut down started...");

crates/l2/sequencer/block_producer.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub enum CallMessage {
4343
#[derive(Clone)]
4444
pub enum InMessage {
4545
Produce,
46+
Abort,
4647
}
4748

4849
#[derive(Clone)]
@@ -260,22 +261,30 @@ impl GenServer for BlockProducer {
260261

261262
async fn handle_cast(
262263
&mut self,
263-
_message: Self::CastMsg,
264+
message: Self::CastMsg,
264265
handle: &GenServerHandle<Self>,
265266
) -> CastResponse {
266-
// Right now we only have the Produce message, so we ignore the message
267-
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
268-
let _ = self
269-
.produce_block()
270-
.await
271-
.inspect_err(|e| error!("Block Producer Error: {e}"));
267+
match message {
268+
InMessage::Produce => {
269+
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
270+
let _ = self
271+
.produce_block()
272+
.await
273+
.inspect_err(|e| error!("Block Producer Error: {e}"));
274+
}
275+
send_after(
276+
Duration::from_millis(self.block_time_ms),
277+
handle.clone(),
278+
Self::CastMsg::Produce,
279+
);
280+
CastResponse::NoReply
281+
}
282+
InMessage::Abort => {
283+
// start_blocking keeps this GenServer alive even if the JoinSet aborts the task.
284+
// Returning CastResponse::Stop is how the blocking runner actually shuts down.
285+
CastResponse::Stop
286+
}
272287
}
273-
send_after(
274-
Duration::from_millis(self.block_time_ms),
275-
handle.clone(),
276-
Self::CastMsg::Produce,
277-
);
278-
CastResponse::NoReply
279288
}
280289

281290
async fn handle_call(

crates/l2/sequencer/l1_committer.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ pub enum CallMessage {
8181
#[derive(Clone)]
8282
pub enum InMessage {
8383
Commit,
84+
Abort,
8485
}
8586

8687
#[derive(Clone)]
@@ -1086,21 +1087,8 @@ impl L1Committer {
10861087
on_chain_proposer_address: self.on_chain_proposer_address,
10871088
})))
10881089
}
1089-
}
1090-
1091-
impl GenServer for L1Committer {
1092-
type CallMsg = CallMessage;
1093-
type CastMsg = InMessage;
1094-
type OutMsg = OutMessage;
1095-
1096-
type Error = CommitterError;
10971090

1098-
// Right now we only have the `Commit` message, so we ignore the `message` parameter
1099-
async fn handle_cast(
1100-
&mut self,
1101-
_message: Self::CastMsg,
1102-
handle: &GenServerHandle<Self>,
1103-
) -> CastResponse {
1091+
async fn handle_commit_message(&mut self, handle: &GenServerHandle<Self>) -> CastResponse {
11041092
if let SequencerStatus::Sequencing = self.sequencer_state.status().await {
11051093
let current_last_committed_batch =
11061094
get_last_committed_batch(&self.eth_client, self.on_chain_proposer_address)
@@ -1151,6 +1139,33 @@ impl GenServer for L1Committer {
11511139
self.schedule_commit(self.committer_wake_up_ms, handle.clone());
11521140
CastResponse::NoReply
11531141
}
1142+
}
1143+
1144+
impl GenServer for L1Committer {
1145+
type CallMsg = CallMessage;
1146+
type CastMsg = InMessage;
1147+
type OutMsg = OutMessage;
1148+
1149+
type Error = CommitterError;
1150+
1151+
// Right now we only have the `Commit` message, so we ignore the `message` parameter
1152+
async fn handle_cast(
1153+
&mut self,
1154+
message: Self::CastMsg,
1155+
handle: &GenServerHandle<Self>,
1156+
) -> CastResponse {
1157+
match message {
1158+
InMessage::Commit => self.handle_commit_message(handle).await,
1159+
InMessage::Abort => {
1160+
// start_blocking keeps the committer loop alive even if the JoinSet aborts the task.
1161+
// Returning CastResponse::Stop is what unblocks shutdown by ending that blocking loop.
1162+
if let Some(ct) = self.cancellation_token.take() {
1163+
ct.cancel()
1164+
};
1165+
CastResponse::Stop
1166+
}
1167+
}
1168+
}
11541169

11551170
async fn handle_call(
11561171
&mut self,

crates/l2/sequencer/mod.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use l1_watcher::L1Watcher;
2020
use metrics::MetricsGatherer;
2121
use proof_coordinator::ProofCoordinator;
2222
use reqwest::Url;
23+
use spawned_concurrency::tasks::GenServerHandle;
24+
use std::pin::Pin;
2325
use tokio_util::sync::CancellationToken;
2426
use tracing::{error, info};
2527
use utils::get_needed_proof_types;
@@ -49,7 +51,14 @@ pub async fn start_l2(
4951
_l2_url: Url,
5052
genesis: Genesis,
5153
checkpoints_dir: PathBuf,
52-
) -> Result<(), errors::SequencerError> {
54+
) -> Result<
55+
(
56+
Option<GenServerHandle<L1Committer>>,
57+
Option<GenServerHandle<BlockProducer>>,
58+
Pin<Box<dyn Future<Output = Result<(), errors::SequencerError>> + Send>>,
59+
),
60+
errors::SequencerError,
61+
> {
5362
let initial_status = if cfg.based.enabled {
5463
SequencerStatus::default()
5564
} else {
@@ -76,7 +85,11 @@ pub async fn start_l2(
7685
)
7786
.await
7887
.inspect_err(|e| error!("Error starting Sequencer: {e}")) else {
79-
return Ok(());
88+
return Ok((
89+
None,
90+
None,
91+
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
92+
));
8093
};
8194

8295
if needed_proof_types.contains(&ProverType::TDX)
@@ -85,7 +98,11 @@ pub async fn start_l2(
8598
error!(
8699
"A private key for TDX is required. Please set the flag `--proof-coordinator.tdx-private-key <KEY>` or use the `ETHREX_PROOF_COORDINATOR_TDX_PRIVATE_KEY` environment variable to set the private key"
87100
);
88-
return Ok(());
101+
return Ok((
102+
None,
103+
None,
104+
Box::pin(async { Ok::<(), errors::SequencerError>(()) }),
105+
));
89106
}
90107

91108
let l1_watcher = L1Watcher::spawn(
@@ -120,7 +137,6 @@ pub async fn start_l2(
120137
.inspect_err(|err| {
121138
error!("Error starting Proof Coordinator: {err}");
122139
});
123-
124140
let l1_proof_sender = L1ProofSender::spawn(
125141
cfg.clone(),
126142
shared_state.clone(),
@@ -196,15 +212,17 @@ pub async fn start_l2(
196212
.await?;
197213
}
198214

215+
let l1_committer_handle = l1_committer.ok();
216+
let block_producer_handle = block_producer.ok();
199217
let admin_server = start_api(
200218
format!(
201219
"{}:{}",
202220
cfg.admin_server.listen_ip, cfg.admin_server.listen_port
203221
),
204-
l1_committer.ok(),
222+
l1_committer_handle.clone(),
205223
l1_watcher.ok(),
206224
l1_proof_sender.ok(),
207-
block_producer.ok(),
225+
block_producer_handle.clone(),
208226
#[cfg(feature = "metrics")]
209227
metrics_gatherer.ok(),
210228
)
@@ -214,26 +232,27 @@ pub async fn start_l2(
214232
})
215233
.ok();
216234

217-
match (verifier_handle, admin_server) {
218-
(Some(handle), Some(admin_server)) => {
219-
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
220-
if let Err(e) = server_res {
221-
error!("Admin server task error: {e}");
235+
let driver = Box::pin(async move {
236+
match (verifier_handle, admin_server) {
237+
(Some(handle), Some(admin_server)) => {
238+
let (server_res, verifier_res) = tokio::join!(admin_server.into_future(), handle);
239+
if let Err(e) = server_res {
240+
error!("Admin server task error: {e}");
241+
}
242+
handle_verifier_result(verifier_res).await;
222243
}
223-
handle_verifier_result(verifier_res).await;
224-
}
225-
(Some(handle), None) => {
226-
handle_verifier_result(tokio::join!(handle).0).await;
227-
}
228-
(None, Some(admin_server)) => {
229-
if let Err(e) = admin_server.into_future().await {
230-
error!("Admin server task error: {e}");
244+
(Some(handle), None) => handle_verifier_result(tokio::join!(handle).0).await,
245+
(None, Some(admin_server)) => {
246+
if let Err(e) = admin_server.into_future().await {
247+
error!("Admin server task error: {e}");
248+
}
231249
}
250+
(None, None) => {}
232251
}
233-
(None, None) => {}
234-
}
235252

236-
Ok(())
253+
Ok(())
254+
});
255+
Ok((l1_committer_handle, block_producer_handle, driver))
237256
}
238257

239258
async fn handle_verifier_result(res: Result<Result<(), SequencerError>, tokio::task::JoinError>) {

0 commit comments

Comments
 (0)