From 1b1b73a24bbcabc96ee0720bcb1194a1034cb609 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 31 Mar 2026 14:08:50 +0200 Subject: [PATCH 1/7] feature ok / needs to be tested --- src/apps/competitions/tasks.py | 62 +- src/celery_config.py | 9 + src/routing.py | 3 + .../riot/competitions/detail/detail.tag | 709 +++++++++++++++++- src/urls.py | 1 - src/utils/consumers.py | 72 ++ 6 files changed, 827 insertions(+), 29 deletions(-) create mode 100644 src/utils/consumers.py diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index d72e9191a..95c30fd35 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -32,6 +32,9 @@ from utils.data import make_url_sassy from utils.email import codalab_send_markdown_email +from channels.layers import get_channel_layer +from asgiref.sync import async_to_sync + import logging logger = logging.getLogger(__name__) @@ -784,9 +787,66 @@ def submission_status_cleanup(): submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) else: sub.cancel(status=Submission.FAILED) + + +def _broadcast_worker_state(payload): + channel_layer = get_channel_layer() + if not channel_layer: + return + + async_to_sync(channel_layer.group_send)( + "compute_workers", + { + "type": "worker.health", + "worker": payload, + }, + ) + + +@app.task(queue="site-worker", soft_time_limit=60) +def refresh_compute_worker_health(): + celery_app = app_or_default() + inspector = celery_app.control.inspect(timeout=1) + + if inspector is None: + logger.warning("Celery inspect returned None") + return + + try: + stats = inspector.stats() or {} + active = inspector.active() or {} + reserved = inspector.reserved() or {} + except Exception: + logger.exception("Unable to inspect Celery workers") + return + + for worker_name in stats.keys(): + if not worker_name.startswith("compute-worker"): + continue + + raw_running_jobs = len(active.get(worker_name, [])) + len(reserved.get(worker_name, [])) + status = "busy" if raw_running_jobs > 0 else "available" + + payload = { + "hostname": worker_name, + "status": status, + "running_jobs": raw_running_jobs, + "timestamp": now().timestamp(), + } + + r.set(f"worker:{worker_name}:heartbeat", json.dumps(payload), ex=35) + r.hset( + WORKERS_REGISTRY_KEY, + worker_name, + json.dumps({ + "hostname": worker_name, + "last_seen": payload["timestamp"], + }), + ) + + _broadcast_worker_state(payload) \ No newline at end of file diff --git a/src/celery_config.py b/src/celery_config.py index 760614783..995103fcc 100644 --- a/src/celery_config.py +++ b/src/celery_config.py @@ -36,3 +36,12 @@ def app_for_vhost(vhost): vhost_app.conf.task_queues = app.conf.task_queues _vhost_apps[vhost] = vhost_app return _vhost_apps[vhost] + + + +app.conf.beat_schedule = { + "refresh-compute-worker-health": { + "task": "chemin.vers.refresh_compute_worker_health", + "schedule": 5.0, + }, +} \ No newline at end of file diff --git a/src/routing.py b/src/routing.py index 2ef280e73..adf3f44cc 100644 --- a/src/routing.py +++ b/src/routing.py @@ -1,7 +1,10 @@ from django.urls import re_path from apps.competitions.consumers import SubmissionIOConsumer, SubmissionOutputConsumer +from utils.consumers import ComputeWorkersConsumer + websocket_urlpatterns = [ re_path(r'submission_input/(?P\d+)/(?P\d+)/(?P[^/]+)/$', SubmissionIOConsumer.as_asgi()), re_path(r'submission_output/$', SubmissionOutputConsumer.as_asgi()), + re_path(r"ws/workers/$", ComputeWorkersConsumer.as_asgi()), ] diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index f0c9418c1..efe60a293 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -1,65 +1,720 @@ - - - +
+
+ + + +
+ + +
+ + -
+ \ No newline at end of file diff --git a/src/urls.py b/src/urls.py index 2634d19d6..88013d5a7 100644 --- a/src/urls.py +++ b/src/urls.py @@ -33,7 +33,6 @@ ] - if settings.DEBUG: # Static files for local dev, so we don't have to collectstatic and such urlpatterns += staticfiles_urlpatterns() diff --git a/src/utils/consumers.py b/src/utils/consumers.py new file mode 100644 index 000000000..817b85342 --- /dev/null +++ b/src/utils/consumers.py @@ -0,0 +1,72 @@ +import asyncio +import time + +from asgiref.sync import sync_to_async +from channels.generic.websocket import AsyncJsonWebsocketConsumer +from celery._state import app_or_default + + +class ComputeWorkersConsumer(AsyncJsonWebsocketConsumer): + async def connect(self): + await self.accept() + self._running = True + self._task = asyncio.create_task(self._push_workers_loop()) + + async def disconnect(self, close_code): + self._running = False + if hasattr(self, "_task"): + self._task.cancel() + + async def _push_workers_loop(self): + while self._running: + workers = await sync_to_async(self._load_snapshot)() + await self.send_json({ + "type": "workers.snapshot", + "workers": workers, + }) + await asyncio.sleep(3) + + def _load_snapshot(self): + celery_app = app_or_default() + inspector = celery_app.control.inspect(timeout=2) + + if inspector is None: + return [] + + try: + stats = inspector.stats() or {} + active = inspector.active() or {} + reserved = inspector.reserved() or {} + active_queues = inspector.active_queues() or {} + except Exception: + return [] + + workers = [] + + for worker_name in stats.keys(): + queues = active_queues.get(worker_name, []) or [] + queue_names = [] + for q in queues: + if isinstance(q, dict) and q.get("name"): + queue_names.append(q["name"]) + + is_compute_worker = ( + "compute-worker" in queue_names + or worker_name.startswith("compute-worker") + or worker_name.startswith("CW") + ) + + if not is_compute_worker: + continue + + running_jobs = len(active.get(worker_name, [])) + len(reserved.get(worker_name, [])) + status = "busy" if running_jobs > 0 else "available" + + workers.append({ + "hostname": worker_name, + "status": status, + "running_jobs": running_jobs, + "timestamp": time.time(), + }) + + return workers \ No newline at end of file From 2877a739ab7b0df2844011e015098c1c4807e0e5 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Thu, 2 Apr 2026 13:43:14 +0200 Subject: [PATCH 2/7] feature deactivated by default + hiden behind a button by default + visibile only by admin or comp organizer or collaborators --- .../riot/competitions/detail/detail.tag | 968 ++++++++---------- src/templates/competitions/detail.html | 8 +- src/utils/consumers.py | 5 + 3 files changed, 458 insertions(+), 523 deletions(-) diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index efe60a293..44f6c222a 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -6,7 +6,17 @@ -