From 732814405927c9737c50ccb1c1443653aec48738 Mon Sep 17 00:00:00 2001 From: Juan <43722131+JuanTecedor@users.noreply.github.com> Date: Tue, 16 Jun 2026 22:37:33 +0200 Subject: [PATCH 1/2] Add typing to worker module --- cortexutils/py.typed | 0 cortexutils/worker.py | 67 ++++++++++++++++++++++++++++--------------- pyproject.toml | 3 ++ 3 files changed, 47 insertions(+), 23 deletions(-) create mode 100644 cortexutils/py.typed diff --git a/cortexutils/py.typed b/cortexutils/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/cortexutils/worker.py b/cortexutils/worker.py index acfa0ea..7013b38 100644 --- a/cortexutils/worker.py +++ b/cortexutils/worker.py @@ -5,20 +5,25 @@ import json import os import sys +from typing import Sequence, Any, NoReturn -DEFAULT_SECRET_PHRASES = ("key", "password", "secret") +DEFAULT_SECRET_PHRASES: Sequence[str] = ("key", "password", "secret") class Worker: READ_TIMEOUT = 3 # seconds - def __init__(self, job_directory, secret_phrases): + def __init__( + self, + job_directory: str | None, + secret_phrases: Sequence[str] | None = None, + ) -> None: if job_directory is None: if len(sys.argv) > 1: job_directory = sys.argv[1] else: job_directory = "/job" - self.job_directory = job_directory + self.job_directory: str | None = job_directory if secret_phrases is None: self.secret_phrases = DEFAULT_SECRET_PHRASES else: @@ -57,20 +62,20 @@ def __init__(self, job_directory, secret_phrases): self.__set_proxies() # Finally run check tlp - if not (self.__check_tlp()): + if not self.__check_tlp(): self.error("TLP is higher than allowed.") - if not (self.__check_pap()): + if not self.__check_pap(): self.error("PAP is higher than allowed.") - def __set_proxies(self): + def __set_proxies(self) -> None: if self.http_proxy is not None: os.environ["http_proxy"] = self.http_proxy if self.https_proxy is not None: os.environ["https_proxy"] = self.https_proxy @staticmethod - def __set_encoding(): + def __set_encoding() -> None: try: if sys.stdout.encoding != "UTF-8": sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict") @@ -79,7 +84,13 @@ def __set_encoding(): except Exception: pass # nosec B110 - def __get_param(self, source, name, default=None, message=None): + def __get_param( + self, + source: dict, + name: str | list[str], + default: Any = None, + message: str | None = None, + ) -> Any: """Extract a specific parameter from given source. :param source: Python dict to search through :param name: Name of the parameter to get. JSON-like syntax, @@ -104,17 +115,17 @@ def __get_param(self, source, name, default=None, message=None): self.error(message) return default - def __check_tlp(self): + def __check_tlp(self) -> bool: """Check if tlp is okay or not; returns False if too high.""" return not (self.enable_check_tlp and self.tlp > self.max_tlp) - def __check_pap(self): + def __check_pap(self) -> bool: """Check if pap is okay or not; returns False if too high.""" return not (self.enable_check_pap and self.pap > self.max_pap) - def __write_output(self, data, ensure_ascii=False): + def __write_output(self, data: dict, ensure_ascii: bool = False) -> None: if self.job_directory is None: json.dump(data, sys.stdout, ensure_ascii=ensure_ascii) else: @@ -124,31 +135,35 @@ def __write_output(self, data, ensure_ascii=False): with open(output_path, mode="w") as f_output: json.dump(data, f_output, ensure_ascii=ensure_ascii) - def get_data(self): + def get_data(self) -> Any: """Wrapper for getting data from input dict. :return: Data (observable value) given through Cortex""" return self.get_param("data", None, "Missing data field") @staticmethod - def build_operation(op_type, **parameters): + def build_operation(op_type: str, **parameters: dict) -> dict: """ :param op_type: an operation type as a string :param parameters: a dict including the operation's params :return: dict """ - operation = {"type": op_type} + operation: dict = {"type": op_type} operation.update(parameters) - return operation - def operations(self, raw): + def operations(self, raw: dict) -> list[dict]: """Returns the list of operations to be executed after the job completes :returns: by default return an empty array""" return [] - def get_param(self, name, default=None, message=None): + def get_param( + self, + name: str, + default: Any = None, + message: str | None = None, + ) -> Any: """Just a wrapper for Analyzer.__get_param. :param name: Name of the parameter to get. JSON-like syntax, e.g. `config.username` @@ -159,7 +174,12 @@ def get_param(self, name, default=None, message=None): return self.__get_param(self._input, name, default, message) - def get_env(self, key, default=None, message=None): + def get_env( + self, + key: str, + default: Any = None, + message: str | None = None, + ) -> Any: """Wrapper for getting configuration values from the environment. :param key: Key of the environment variable to get. :param default: Default value, if not found. Default: None @@ -174,7 +194,8 @@ def get_env(self, key, default=None, message=None): self.error(message) return default - def error(self, message, ensure_ascii=False): + # def error(self, message: str, ensure_ascii: bool = False): + def error(self, message: str, ensure_ascii: bool = False) -> NoReturn: """Stop analyzer with an error message. Changing ensure_ascii can be helpful when stuck with ascii <-> utf-8 issues. @@ -203,7 +224,7 @@ def error(self, message, ensure_ascii=False): # Force exit after error sys.exit(1) - def summary(self, raw): + def summary(self, raw: dict) -> dict: """Returns a summary, needed for 'short.html' template. Overwrite it for your needs! @@ -211,10 +232,10 @@ def summary(self, raw): :returns: by default return an empty dict""" return {} - def artifacts(self, raw): + def artifacts(self, raw: dict) -> list[dict]: return [] - def report(self, output, ensure_ascii=False): + def report(self, output: dict, ensure_ascii: bool = False) -> None: """Returns a json dict via stdout. :param output: worker output. @@ -222,6 +243,6 @@ def report(self, output, ensure_ascii=False): self.__write_output(output, ensure_ascii=ensure_ascii) - def run(self): + def run(self) -> None: """Overwritten by analyzers""" pass diff --git a/pyproject.toml b/pyproject.toml index 8ed0e73..7b7e63f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,9 @@ dev = ["cortexutils[audit, lint, test, build]", "nox"] [tool.setuptools.packages.find] include = ["cortexutils*"] +[tool.setuptools.package-data] +cortexutils = ["py.typed"] + [tool.coverage.run] omit = ["tests/*"] From b5266ed7804d2f5dbdf81f262c2a4e37fc12e431 Mon Sep 17 00:00:00 2001 From: Juan <43722131+JuanTecedor@users.noreply.github.com> Date: Tue, 16 Jun 2026 22:44:25 +0200 Subject: [PATCH 2/2] Remove commented out function header --- cortexutils/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cortexutils/worker.py b/cortexutils/worker.py index 7013b38..e55fead 100644 --- a/cortexutils/worker.py +++ b/cortexutils/worker.py @@ -194,7 +194,6 @@ def get_env( self.error(message) return default - # def error(self, message: str, ensure_ascii: bool = False): def error(self, message: str, ensure_ascii: bool = False) -> NoReturn: """Stop analyzer with an error message.