Skip to content

Commit f5a5102

Browse files
committed
fix(runtime): evict runners on SIGTERM, add periodic shutdown progress messages
1 parent 02023f8 commit f5a5102

File tree

7 files changed

+72
-8
lines changed

7 files changed

+72
-8
lines changed

engine/artifacts/errors/ws.going_away.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/gasoline/src/worker.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub(crate) const PING_INTERVAL: Duration = Duration::from_secs(10);
2525
const METRICS_INTERVAL: Duration = Duration::from_secs(20);
2626
// How long the pull workflows function can take before shutting down the runtime.
2727
const PULL_WORKFLOWS_TIMEOUT: Duration = Duration::from_secs(10);
28+
const SHUTDOWN_PROGRESS_INTERVAL: Duration = Duration::from_secs(7);
2829

2930
/// Used to spawn a new thread that indefinitely polls the database for new workflows. Only pulls workflows
3031
/// that are registered in its registry. After pulling, the workflows are ran and their state is written to
@@ -297,6 +298,9 @@ impl Worker {
297298
.map(|(_, wf)| &mut wf.handle)
298299
.collect::<FuturesUnordered<_>>();
299300

301+
let mut progress_interval = tokio::time::interval(SHUTDOWN_PROGRESS_INTERVAL);
302+
progress_interval.tick().await;
303+
300304
let shutdown_start = Instant::now();
301305
loop {
302306
// Future will resolve once all workflow tasks complete
@@ -306,6 +310,9 @@ impl Worker {
306310
_ = join_fut => {
307311
break;
308312
}
313+
_ = progress_interval.tick() => {
314+
tracing::info!(remaining_workflows=%wf_futs.len(), "worker still shutting down");
315+
}
309316
abort = term_signal.recv() => {
310317
if abort {
311318
tracing::warn!("aborting worker shutdown");

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2522,6 +2522,10 @@ impl ProxyServiceFactory {
25222522
pub async fn wait_idle(&self) {
25232523
self.state.tasks.wait_idle().await
25242524
}
2525+
2526+
pub fn remaining_tasks(&self) -> usize {
2527+
self.state.tasks.remaining_tasks()
2528+
}
25252529
}
25262530

25272531
fn add_proxy_headers_with_addr(

engine/packages/guard-core/src/server.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::{
22
net::SocketAddr,
3-
sync::Arc,
3+
sync::{
4+
Arc,
5+
atomic::{AtomicBool, Ordering},
6+
},
47
time::{Duration, Instant},
58
};
69

@@ -15,6 +18,8 @@ use crate::cert_resolver::{CertResolverFn, create_tls_config};
1518
use crate::metrics;
1619
use crate::proxy_service::{CacheKeyFn, MiddlewareFn, ProxyServiceFactory, RoutingFn};
1720

21+
const SHUTDOWN_PROGRESS_INTERVAL: Duration = Duration::from_secs(7);
22+
1823
// Start the server
1924
#[tracing::instrument(skip_all)]
2025
pub async fn run_server(
@@ -248,26 +253,41 @@ pub async fn run_server(
248253
}
249254

250255
let shutdown_duration = config.runtime.guard_shutdown_duration();
251-
tracing::info!(duration=?shutdown_duration, "starting guard shutdown");
252-
256+
let remaining_tasks = http_factory.remaining_tasks()
257+
+ https_factory
258+
.as_ref()
259+
.map(|f| f.remaining_tasks())
260+
.unwrap_or(0);
261+
tracing::info!(%remaining_tasks, hyper_shutdown=%false, duration=?shutdown_duration, "starting guard shutdown");
262+
263+
// Signifies that the hyper graceful shutdown completed
264+
let hyper_shutdown = Arc::new(AtomicBool::new(false));
265+
266+
let hyper_shutdown2 = hyper_shutdown.clone();
267+
let http_factory2 = http_factory.clone();
268+
let https_factory2 = https_factory.clone();
253269
let mut complete_fut = async move {
254270
// Wait until remaining requests finish
255271
graceful.shutdown().await;
272+
hyper_shutdown2.store(true, Ordering::Release);
256273

257274
// Wait until remaining tasks finish
258-
http_factory.wait_idle().await;
275+
http_factory2.wait_idle().await;
259276

260-
if let Some(https_factory) = https_factory {
277+
if let Some(https_factory) = https_factory2 {
261278
https_factory.wait_idle().await;
262279
}
263280
}
264281
.boxed();
265282

283+
let mut progress_interval = tokio::time::interval(SHUTDOWN_PROGRESS_INTERVAL);
284+
progress_interval.tick().await;
285+
266286
let shutdown_start = Instant::now();
267287
loop {
268288
tokio::select! {
269289
_ = &mut complete_fut => {
270-
tracing::info!("all guard requests completed");
290+
tracing::info!("all guard tasks completed");
271291
break;
272292
}
273293
abort = term_signal.recv() => {
@@ -276,8 +296,15 @@ pub async fn run_server(
276296
break;
277297
}
278298
}
299+
_ = progress_interval.tick() => {
300+
let remaining_tasks = http_factory.remaining_tasks() +
301+
https_factory.as_ref().map(|f| f.remaining_tasks()).unwrap_or(0);
302+
let hyper_shutdown = hyper_shutdown.load(Ordering::Acquire);
303+
304+
tracing::info!(%remaining_tasks, hyper_shutdown, "guard still shutting down");
305+
}
279306
_ = tokio::time::sleep(shutdown_duration.saturating_sub(shutdown_start.elapsed())) => {
280-
tracing::warn!("guard shutdown timed out before all requests completed");
307+
tracing::warn!("guard shutdown timed out before all tasks completed");
281308
break;
282309
}
283310
}

engine/packages/guard-core/src/task_group.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,8 @@ impl TaskGroup {
5454
}
5555
}
5656
}
57+
58+
pub fn remaining_tasks(&self) -> usize {
59+
self.running_count.load(Ordering::Acquire)
60+
}
5761
}

engine/packages/pegboard-runner/src/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ pub enum WsError {
1111
"The websocket has been evicted and should not attempt to reconnect."
1212
)]
1313
Eviction,
14+
#[error(
15+
"going_away",
16+
"The Rivet Engine is migrating. The websocket should attempt to reconnect as soon as possible."
17+
)]
18+
GoingAway,
1419
#[error(
1520
"timed_out_waiting_for_init",
1621
"Timed out waiting for the init packet to be sent."

engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,17 @@ pub async fn task_inner(
5454
event_demuxer: &mut ActorEventDemuxer,
5555
) -> Result<LifecycleResult> {
5656
let mut ws_rx = ws_rx.lock().await;
57+
let mut term_signal = rivet_runtime::TermSignal::new().await;
5758

5859
loop {
59-
match recv_msg(&mut ws_rx, &mut eviction_sub2, &mut ws_to_tunnel_abort_rx).await? {
60+
match recv_msg(
61+
&mut ws_rx,
62+
&mut eviction_sub2,
63+
&mut ws_to_tunnel_abort_rx,
64+
&mut term_signal,
65+
)
66+
.await?
67+
{
6068
Ok(Some(msg)) => {
6169
if protocol::is_mk2(conn.protocol_version) {
6270
handle_message_mk2(&ctx, &conn, event_demuxer, msg).await?;
@@ -74,6 +82,7 @@ async fn recv_msg(
7482
ws_rx: &mut MutexGuard<'_, WebSocketReceiver>,
7583
eviction_sub2: &mut Subscriber,
7684
ws_to_tunnel_abort_rx: &mut watch::Receiver<()>,
85+
term_signal: &mut rivet_runtime::TermSignal,
7786
) -> Result<std::result::Result<Option<Bytes>, LifecycleResult>> {
7887
let msg = tokio::select! {
7988
res = ws_rx.try_next() => {
@@ -92,6 +101,9 @@ async fn recv_msg(
92101
tracing::debug!("task aborted");
93102
return Ok(Err(LifecycleResult::Aborted));
94103
}
104+
_ = term_signal.recv() => {
105+
return Err(errors::WsError::GoingAway.build());
106+
}
95107
};
96108

97109
match msg {

0 commit comments

Comments
 (0)