From 70f4c12c65ff5068efc83dc924b878131b74a86d Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Fri, 27 Mar 2026 01:53:34 +0500 Subject: [PATCH 1/6] reorganzing and cleaning of compute worker --- compute_worker/compute_worker.py | 323 +++++++++++++++++-------------- 1 file changed, 177 insertions(+), 146 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 573e72a94..e7d78ba8b 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -10,75 +10,163 @@ import tempfile import time import uuid +import requests +import websockets +import yaml +import docker +import logging +import sys # This is only needed for the pytests to pass from shutil import make_archive from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve from zipfile import ZipFile, BadZipFile -import docker -from rich.progress import Progress +from urllib3 import Retry + from rich.pretty import pprint -import requests -import websockets -import yaml -from billiard.exceptions import SoftTimeLimitExceeded -from celery import Celery, shared_task, utils +from rich.progress import Progress from kombu import Queue, Exchange -from urllib3 import Retry +from celery import Celery, shared_task, utils, signals +from billiard.exceptions import SoftTimeLimitExceeded -# This is only needed for the pytests to pass -import sys +from logs_loguru import configure_logging, colorize_run_args + +logger = logging.getLogger(__name__) sys.path.append("/app/src/settings/") -from celery import signals -import logging -logger = logging.getLogger(__name__) -from logs_loguru import configure_logging, colorize_run_args -import json +# ----------------------------------------------- +# Env Settings +# ----------------------------------------------- +class Settings: + + @staticmethod + def get(key, default=None): + """ + Return the env var value if set, else default; returns None if not set and no default. + """ + val = os.getenv(key) + + if val is not None: + return val + + if default is not None: + return default + + logger.warning(f"Environment variable '{key}' not found and no default provided.") + return None + + @staticmethod + def to_bool(val): + try: + if isinstance(val, bool): + return val + + val_str = str(val).strip() + + if val_str in ("true", "True", "TRUE", "1"): + return True + if val_str in ("false", "False", "FALSE", "0"): + return False + + logger.warning(f"Failed to parse boolean from '{val}'") + return val + + except Exception as e: + logger.warning(f"Failed to parse boolean from '{val}': {e}") + return val + + # Constants + DOCKER = "docker" + PODMAN = "podman" + LOG_LEVEL_DEBUG = "debug" + + # Defaults + DEFAULT_SOCKETS = { + DOCKER: "unix:///var/run/docker.sock", + PODMAN: "unix:///run/user/1000/podman/podman.sock", + } + + # Settings variables + LOG_LEVEL = get("LOG_LEVEL", "INFO").lower() + SERIALIZED = get("SERIALIZED", "false") + + USE_GPU = to_bool(get("USE_GPU", "false")) + CONTAINER_ENGINE_EXECUTABLE = get("CONTAINER_ENGINE_EXECUTABLE", DOCKER).lower() + GPU_DEVICE = get("GPU_DEVICE", "nvidia.com/gpu=all") + + CONTAINER_SOCKET = get("CONTAINER_SOCKET", DEFAULT_SOCKETS.get(CONTAINER_ENGINE_EXECUTABLE)) + + HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") + MAX_CACHE_DIR_SIZE_GB = get("MAX_CACHE_DIR_SIZE_GB", 10) + + COMPETITION_CONTAINER_NETWORK_DISABLED = to_bool(get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False")) + COMPETITION_CONTAINER_HTTP_PROXY = get("COMPETITION_CONTAINER_HTTP_PROXY", "") + COMPETITION_CONTAINER_HTTPS_PROXY = get("COMPETITION_CONTAINER_HTTPS_PROXY", "") + + CODALAB_IGNORE_CLEANUP_STEP = to_bool(get("CODALAB_IGNORE_CLEANUP_STEP")) + + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + + +# ----------------------------------------------- +# Program Kind +# ----------------------------------------------- +# NOTE: This is not used, to be used in next PR +class ProgramKind: + INGESTION_PROGRAM = "ingestion_program" + SCORING_PROGRAM = "scoring_program" + SUBMISSION = "submission" + + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- +class SubmissionStatus: + NONE = "None" + SUBMITTING = "Submitting" + SUBMITTED = "Submitted" + PREPARING = "Preparing" + RUNNING = "Running" + SCORING = "Scoring" + FINISHED = "Finished" + FAILED = "Failed" + + AVAILABLE_STATUSES = ( + NONE, + SUBMITTING, + SUBMITTED, + PREPARING, + RUNNING, + SCORING, + FINISHED, + FAILED, + ) # ----------------------------------------------- # Logging # ----------------------------------------------- configure_logging( - os.environ.get("LOG_LEVEL", "INFO"), os.environ.get("SERIALIZED", "false") + Settings.LOG_LEVEL, Settings.SERIALIZED ) # ----------------------------------------------- # Initialize Docker or Podman depending on .env # ----------------------------------------------- -if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + "with GPU capabilites : " - + os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all") - + " network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) -else: - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + " without GPU capabilities. " - + "network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) +logger.info( + f"Using {Settings.CONTAINER_ENGINE_EXECUTABLE} " + f"{'with GPU capabilities: ' + Settings.GPU_DEVICE if Settings.USE_GPU else 'without GPU capabilities'}. " + f"Network disabled for the competition container is set to {Settings.COMPETITION_CONTAINER_NETWORK_DISABLED}" +) -if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - client = docker.APIClient( - base_url=os.environ.get("CONTAINER_SOCKET", "unix:///var/run/docker.sock"), - version="auto", - ) -elif os.environ.get("CONTAINER_ENGINE_EXECUTABLE").lower() == "podman": - client = docker.APIClient( - base_url=os.environ.get( - "CONTAINER_SOCKET", "unix:///run/user/1000/podman/podman.sock" - ), - version="auto", - ) +# Intializing client +# NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must has either podman or docker +client = docker.APIClient( + base_url=Settings.CONTAINER_SOCKET, + version="auto", +) # ----------------------------------------------- @@ -147,8 +235,8 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception("There was an error showing the progress bar") + if Settings.LOG_LEVEL == Settings.DEBUG: + logger.exception(f"There was an error showing the progress bar: {e}") # ----------------------------------------------- @@ -175,35 +263,11 @@ def setup_celery_logging(**kwargs): # Directories # ----------------------------------------------- # Setup base directories used by all submissions -# note: we need to pass this directory to docker/podman so it knows where to store things! -HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") +# NOTE: we need to pass this directory to docker/podman so it knows where to store things! +HOST_DIRECTORY = Settings.HOST_DIRECTORY BASE_DIR = "/codabench/" # base directory inside the container CACHE_DIR = os.path.join(BASE_DIR, "cache") -MAX_CACHE_DIR_SIZE_GB = float(os.environ.get("MAX_CACHE_DIR_SIZE_GB", 10)) - - -# ----------------------------------------------- -# Submission status -# ----------------------------------------------- -# Status options for submissions -STATUS_NONE = "None" -STATUS_SUBMITTING = "Submitting" -STATUS_SUBMITTED = "Submitted" -STATUS_PREPARING = "Preparing" -STATUS_RUNNING = "Running" -STATUS_SCORING = "Scoring" -STATUS_FINISHED = "Finished" -STATUS_FAILED = "Failed" -AVAILABLE_STATUSES = ( - STATUS_NONE, - STATUS_SUBMITTING, - STATUS_SUBMITTED, - STATUS_PREPARING, - STATUS_RUNNING, - STATUS_SCORING, - STATUS_FINISHED, - STATUS_FAILED, -) +MAX_CACHE_DIR_SIZE_GB = float(Settings.MAX_CACHE_DIR_SIZE_GB) # ----------------------------------------------- @@ -232,7 +296,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = os.getenv("WORKER_BUNDLE_URL_REWRITE", "").strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() if not rule or "|" not in rule: return url src, dst = rule.split("|", 1) @@ -265,13 +329,11 @@ def run_wrapper(run_args): msg = f"Docker image pull failed: {msg}" else: msg = "Docker image pull failed." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except SoftTimeLimitExceeded: run._update_status( - STATUS_FAILED, - extra_information="Execution time limit exceeded.", - ) + SubmissionStatus.FAILED, extra_information="Execution time limit exceeded.") raise except SubmissionException as e: msg = str(e).strip() @@ -279,11 +341,11 @@ def run_wrapper(run_args): msg = f"Submission failed: {msg}. See logs for more details." else: msg = "Submission failed. See logs for more details." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except Exception as e: # Catch any exception to avoid getting stuck in Running status - run._update_status(STATUS_FAILED, extra_information=traceback.format_exc()) + run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) raise finally: try: @@ -573,9 +635,9 @@ def _update_submission(self, data): def _update_status(self, status, extra_information=None): # Update submission status - if status not in AVAILABLE_STATUSES: + if status not in SubmissionStatus.AVAILABLE_STATUSES: raise SubmissionException( - f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}" + f"Status '{status}' is not in available statuses: {SubmissionStatus.AVAILABLE_STATUSES}" ) data = {"status": status, "status_details": extra_information} try: @@ -738,26 +800,18 @@ async def _run_container_engine_cmd(self, container, kind): websocket = None try: websocket_url = f"{self.websocket_url}?kind={kind}" - logger.debug( - "Connecting to " - + websocket_url - + "for container " - + str(container.get("Id")) - ) + logger.debug(f"Connecting to {websocket_url} for container {str(container.get('Id'))}") websocket = await asyncio.wait_for( websockets.connect(websocket_url), timeout=10.0 ) - logger.debug( - "connected to " - + str(websocket_url) - + "for container " - + str(container.get("Id")) - ) + logger.debug(f"connected to {websocket_url} for container {str(container.get('Id'))}") + except Exception as e: logger.error( f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) start = time.time() @@ -775,8 +829,7 @@ async def _run_container_engine_cmd(self, container, kind): # If we enter the for loop after the container exited, the program will get stuck if ( - client.inspect_container(container)["State"]["Status"].lower() - == "running" + client.inspect_container(container)["State"]["Status"].lower() == "running" ): logger.debug( "Show the logs and stream them to codabench " + container.get("Id") @@ -793,7 +846,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error(e) - + # Errors elif log[1] is not None: stderr_chunks.append(log[1]) @@ -812,7 +865,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error( f"There was an error while starting the container and getting the logs: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) # Get the return code of the competition container once done @@ -831,12 +884,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) client.remove_container(container, force=True) - logger.debug( - "Container " - + container.get("Id") - + "exited with status code : " - + str(return_Code["StatusCode"]) - ) + logger.debug(f"Container {container.get('Id')} exited with status code : {str(return_Code['StatusCode'])}") except ( requests.exceptions.ReadTimeout, @@ -849,7 +897,7 @@ async def _run_container_engine_cmd(self, container, kind): finally: try: # Last chance of removing container - client.remove_container(container_id, force=True) + client.remove_container(container.get("Id"), force=True) except Exception: pass @@ -885,7 +933,7 @@ def _get_host_path(self, *paths): path = os.path.join(*paths) # pull front of path, which points to the location inside the container - path = path[len(BASE_DIR) :] + path = path[len(BASE_DIR):] # add host to front, so when we run commands in the container on the host they # can be seen properly @@ -1044,14 +1092,16 @@ async def _run_program_directory(self, program_dir, kind): "SETUID", "SYS_CHROOT", ] + # Configure whether or not we use the GPU. Also setting auto_remove to False because - if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.DOCKER: security_options = ["no-new-privileges"] else: security_options = ["label=disable"] + # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] - if os.environ.get("USE_GPU", "false").lower() == "true": + device_id = [Settings.GPU_DEVICE] + if Settings.USE_GPU: logger.info("Running the container with GPU capabilities") host_config = client.create_host_config( auto_remove=False, @@ -1081,26 +1131,10 @@ async def _run_program_directory(self, program_dir, kind): if kind == "ingestion" else self.program_container_name ) - # Disable or not the competition container access to Internet (False by default) - container_network_disabled = os.environ.get( - "COMPETITION_CONTAINER_NETWORK_DISABLED", "" - ) + # Creating container + # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) # HTTP and HTTPS proxy for the competition container if needed - competition_container_proxy_http = os.environ.get( - "COMPETITION_CONTAINER_HTTP_PROXY", "" - ) - competition_container_proxy_http = ( - "http_proxy=" + competition_container_proxy_http - ) - - competition_container_proxy_https = os.environ.get( - "COMPETITION_CONTAINER_HTTPS_PROXY", "" - ) - competition_container_proxy_https = ( - "https_proxy=" + competition_container_proxy_https - ) - container = client.create_container( self.container_image, name=container_name, @@ -1111,12 +1145,13 @@ async def _run_program_directory(self, program_dir, kind): working_dir="/app/program", environment=[ "PYTHONUNBUFFERED=1", - competition_container_proxy_http, - competition_container_proxy_https, + "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, + "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, ], - network_disabled=container_network_disabled.lower() == "true", + network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED, ) - logger.debug("Created container : " + str(container)) + + logger.debug("Created container: " + str(container)) logger.info("Volume configuration of the container: ") pprint(volumes_config) # This runs the container engine command and asynchronously passes data back via websocket @@ -1196,16 +1231,12 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: - self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" - ) + self._update_status(SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}") else: - self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" - ) + self._update_status(SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}") if not self.is_scoring: # Only during prediction step do we want to announce "preparing" - self._update_status(STATUS_PREPARING) + self._update_status(SubmissionStatus.PREPARING) # Setup cache and prune if it's out of control self._prep_cache_dir() @@ -1295,7 +1326,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) @@ -1347,7 +1378,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) if kind == "program": self.program_exit_code = return_code @@ -1381,15 +1412,15 @@ def start(self): failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information=f"program_rc={program_rc}, async={task_results}", ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(STATUS_FINISHED) + self._update_status(SubmissionStatus.FINISHED) else: - self._update_status(STATUS_SCORING) + self._update_status(SubmissionStatus.SCORING) def push_scores(self): """This is only ran at the end of the scoring step""" @@ -1461,7 +1492,7 @@ def push_output(self): self._put_dir(self.scoring_result, self.output_dir) def clean_up(self): - if os.environ.get("CODALAB_IGNORE_CLEANUP_STEP"): + if Settings.CODALAB_IGNORE_CLEANUP_STEP: logger.warning( f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}" ) From ae39fd496f13e8b0abaca3e3f0056e98e36c566f Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Fri, 27 Mar 2026 02:02:02 +0500 Subject: [PATCH 2/6] minor updates --- compute_worker/compute_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index e7d78ba8b..fec86936a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -107,7 +107,7 @@ def to_bool(val): CODALAB_IGNORE_CLEANUP_STEP = to_bool(get("CODALAB_IGNORE_CLEANUP_STEP")) - WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip() # ----------------------------------------------- @@ -235,7 +235,7 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if Settings.LOG_LEVEL == Settings.DEBUG: + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(f"There was an error showing the progress bar: {e}") @@ -296,7 +296,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE if not rule or "|" not in rule: return url src, dst = rule.split("|", 1) From 5f42a0454bbac61cbd80f65327f591d87a9f8215 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Tue, 31 Mar 2026 21:32:44 +0500 Subject: [PATCH 3/6] some more cleaning --- compute_worker/compute_worker.py | 45 ++++++++++++++------------------ 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index fec86936a..32daa0a7b 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -37,7 +37,7 @@ # ----------------------------------------------- -# Env Settings +# Settings # ----------------------------------------------- class Settings: @@ -77,6 +77,13 @@ def to_bool(val): logger.warning(f"Failed to parse boolean from '{val}': {e}") return val + # Directories + # NOTE: we need to pass this directory to docker/podman so it knows where to store things! + HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") + MAX_CACHE_DIR_SIZE_GB = float(get("MAX_CACHE_DIR_SIZE_GB", 10)) + BASE_DIR = "/codabench/" # base directory inside the container + CACHE_DIR = os.path.join(BASE_DIR, "cache") + # Constants DOCKER = "docker" PODMAN = "podman" @@ -88,7 +95,7 @@ def to_bool(val): PODMAN: "unix:///run/user/1000/podman/podman.sock", } - # Settings variables + # env variables LOG_LEVEL = get("LOG_LEVEL", "INFO").lower() SERIALIZED = get("SERIALIZED", "false") @@ -98,9 +105,6 @@ def to_bool(val): CONTAINER_SOCKET = get("CONTAINER_SOCKET", DEFAULT_SOCKETS.get(CONTAINER_ENGINE_EXECUTABLE)) - HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") - MAX_CACHE_DIR_SIZE_GB = get("MAX_CACHE_DIR_SIZE_GB", 10) - COMPETITION_CONTAINER_NETWORK_DISABLED = to_bool(get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False")) COMPETITION_CONTAINER_HTTP_PROXY = get("COMPETITION_CONTAINER_HTTP_PROXY", "") COMPETITION_CONTAINER_HTTPS_PROXY = get("COMPETITION_CONTAINER_HTTPS_PROXY", "") @@ -259,15 +263,6 @@ def setup_celery_logging(**kwargs): queue_arguments={"x-max-priority": 10}, ), ] -# ----------------------------------------------- -# Directories -# ----------------------------------------------- -# Setup base directories used by all submissions -# NOTE: we need to pass this directory to docker/podman so it knows where to store things! -HOST_DIRECTORY = Settings.HOST_DIRECTORY -BASE_DIR = "/codabench/" # base directory inside the container -CACHE_DIR = os.path.join(BASE_DIR, "cache") -MAX_CACHE_DIR_SIZE_GB = float(Settings.MAX_CACHE_DIR_SIZE_GB) # ----------------------------------------------- @@ -458,13 +453,11 @@ def __init__(self, run_args): # Directories for the run self.watch = True self.completed_program_counter = 0 - self.root_dir = tempfile.mkdtemp(prefix=f'{self.run_related_name}__', dir=BASE_DIR) + self.root_dir = tempfile.mkdtemp(prefix=f'{self.run_related_name}__', dir=Settings.BASE_DIR) self.bundle_dir = os.path.join(self.root_dir, "bundles") self.input_dir = os.path.join(self.root_dir, "input") self.output_dir = os.path.join(self.root_dir, "output") - self.data_dir = os.path.join( - HOST_DIRECTORY, "data" - ) # absolute path to data in the host + self.data_dir = os.path.join(Settings.HOST_DIRECTORY, "data") # absolute path to data in the host self.logs = {} # Details for submission @@ -743,7 +736,7 @@ def _get_bundle(self, url, destination, cache=True): # Hash url and download it if it doesn't exist url_without_params = url.split("?")[0] url_hash = hashlib.sha256(url_without_params.encode("utf8")).hexdigest() - bundle_file = os.path.join(CACHE_DIR, url_hash) + bundle_file = os.path.join(Settings.CACHE_DIR, url_hash) download_needed = not os.path.exists(bundle_file) else: if not os.path.exists(self.bundle_dir): @@ -933,11 +926,11 @@ def _get_host_path(self, *paths): path = os.path.join(*paths) # pull front of path, which points to the location inside the container - path = path[len(BASE_DIR):] + path = path[len(Settings.BASE_DIR):] # add host to front, so when we run commands in the container on the host they # can be seen properly - path = os.path.join(HOST_DIRECTORY, path) + path = os.path.join(Settings.HOST_DIRECTORY, path) # Create if necessary os.makedirs(path, exist_ok=True) @@ -1218,13 +1211,13 @@ def _put_file(self, url, file=None, raw_data=None, content_type="application/zip logger.info(f"response: {resp}") logger.info(f"content: {resp.content}") - def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): - if not os.path.exists(CACHE_DIR): - os.mkdir(CACHE_DIR) + def _prep_cache_dir(self, max_size=Settings.MAX_CACHE_DIR_SIZE_GB): + if not os.path.exists(Settings.CACHE_DIR): + os.mkdir(Settings.CACHE_DIR) logger.info("Checking if cache directory needs to be pruned...") - if get_folder_size_in_gb(CACHE_DIR) > max_size: + if get_folder_size_in_gb(Settings.CACHE_DIR) > max_size: logger.info("Pruning cache directory") - delete_files_in_folder(CACHE_DIR) + delete_files_in_folder(Settings.CACHE_DIR) else: logger.info("Cache directory does not need to be pruned!") From 5a38820c014f3a0a9d0bc202db8691cc22d31a01 Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 1 Apr 2026 15:47:02 +0200 Subject: [PATCH 4/6] More cleaning --- compute_worker/compute_worker.py | 52 +++++++++++++++----------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 32daa0a7b..c3fcbc3b7 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -405,6 +405,8 @@ def get_folder_size_in_gb(folder): def delete_files_in_folder(folder): + if not os.path.isdir(folder): + return for filename in os.listdir(folder): file_path = os.path.join(folder, filename) if os.path.isfile(file_path) or os.path.islink(file_path): @@ -975,15 +977,15 @@ async def _run_program_directory(self, program_dir, kind): logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: try: # try to find a command in the metadata, in other cases set metadata to None - metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader) + metadata = yaml.safe_load(metadata_file.read()) logger.info(f"Metadata contains:\n {metadata}") if isinstance(metadata, dict): # command found command = metadata.get("command") else: command = None except yaml.YAMLError as e: - logger.error("Error parsing YAML file: ", e) - print("Error parsing YAML file: ", e) + logger.error(f"Error parsing YAML file: {e}") + print(f"Error parsing YAML file: {e}") command = None if not command and kind == "ingestion": raise SubmissionException( @@ -1227,7 +1229,6 @@ def prepare(self): self._update_status(SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}") else: self._update_status(SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}") - if not self.is_scoring: # Only during prediction step do we want to announce "preparing" self._update_status(SubmissionStatus.PREPARING) @@ -1302,25 +1303,25 @@ def start(self): "error_message": error_message, "is_scoring": self.is_scoring, } - # Some cleanup - for kind, logs in self.logs.items(): - containers_to_kill = [] - containers_to_kill.append(self.ingestion_container_name) - containers_to_kill.append(self.program_container_name) - logger.debug( - "Trying to kill and remove container " + str(containers_to_kill) - ) - for container in containers_to_kill: - try: - client.remove_container(str(container), force=True) - except docker.errors.APIError as e: - logger.error(e) - except Exception as e: - logger.error( - f"There was a problem killing {containers_to_kill}: {e}" - ) - if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: - logger.exception(e) + # Cleanup containers + containers_to_kill = [ + self.ingestion_container_name, + self.program_container_name + ] + logger.debug( + "Trying to kill and remove container " + str(containers_to_kill) + ) + for container in containers_to_kill: + try: + client.remove_container(str(container), force=True) + except docker.errors.APIError as e: + logger.error(e) + except Exception as e: + logger.error( + f"There was a problem killing {containers_to_kill}: {e}" + ) + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: + logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) # Send error through web socket to the frontend @@ -1390,9 +1391,6 @@ def start(self): # set logs of this kind to None, since we handled them already logger.info("Program finished") signal.alarm(0) - # Ensure loop is cleaned up - loop.close() - asyncio.set_event_loop(None) if self.is_scoring: # Check if scoring program failed @@ -1434,7 +1432,7 @@ def push_scores(self): elif os.path.exists(os.path.join(self.output_dir, "scores.txt")): scores_file = os.path.join(self.output_dir, "scores.txt") with open(scores_file) as f: - scores = yaml.load(f, yaml.Loader) + scores = yaml.safe_load(f) else: raise SubmissionException( "Could not find scores file, did the scoring program output it?" From 81925b8df3b379592d3609d0111d114190d97468 Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 1 Apr 2026 15:59:19 +0200 Subject: [PATCH 5/6] Minor syntax cleanup --- compute_worker/compute_worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c3fcbc3b7..2cd51baf1 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -823,9 +823,7 @@ async def _run_container_engine_cmd(self, container, kind): ) # If we enter the for loop after the container exited, the program will get stuck - if ( - client.inspect_container(container)["State"]["Status"].lower() == "running" - ): + if client.inspect_container(container)["State"]["Status"].lower() == "running": logger.debug( "Show the logs and stream them to codabench " + container.get("Id") ) From 9a726fe710eecb0ddc8e4a107a3186ba52e90caf Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 1 Apr 2026 16:19:29 +0200 Subject: [PATCH 6/6] RUNNING status during ingestion --- compute_worker/compute_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 2cd51baf1..147ed2d2f 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1226,9 +1226,8 @@ def prepare(self): if self.is_scoring: self._update_status(SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}") else: - self._update_status(SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}") # Only during prediction step do we want to announce "preparing" - self._update_status(SubmissionStatus.PREPARING) + self._update_status(SubmissionStatus.PREPARING, extra_information=f"ingestion_hostname-{hostname}") # Setup cache and prune if it's out of control self._prep_cache_dir() @@ -1268,6 +1267,7 @@ def prepare(self): # Before the run starts we want to download images, they may take a while to download # and to do this during the run would subtract from the participants time. self._get_container_image(self.container_image) + self._update_status(SubmissionStatus.RUNNING) def start(self): program_dir = os.path.join(self.root_dir, "program")