diff --git a/iotlabcli/associations.py b/iotlabcli/associations.py index 49c426d..61471dd 100644 --- a/iotlabcli/associations.py +++ b/iotlabcli/associations.py @@ -34,14 +34,16 @@ import abc import collections.abc +from collections.abc import Callable +from typing import Any -def _disabled_method(*_): +def _disabled_method(*_: Any) -> None: """Disabled method.""" raise AttributeError -def setattrdefault(obj, attribute, default=None): +def setattrdefault(obj: Any, attribute: str, default: Any = None) -> Any: """Setdefault for attribute. Returns 'attribute' value if defined or set it to default and return it. @@ -69,7 +71,7 @@ class _Association( VALUE = None VALUE_SORT_KEY = None - def __init__(self, key, value): # pylint:disable=super-init-not-called + def __init__(self, key: Any, value: Any) -> None: # pylint:disable=super-init-not-called # Don't call 'dict' init, only used for json dumping self._concrete_class() self.key = key @@ -135,7 +137,7 @@ def from_dict(cls, assocdict): return cls(assocdict[cls._keyattr()], assocdict[cls._valueattr()]) @staticmethod - def staticclassattribute(function): + def staticclassattribute(function: Callable[..., Any] | None) -> Any: """Return given function as a staticmethod, handle None as None. This allows storing the function as a class attribute. @@ -143,7 +145,9 @@ def staticclassattribute(function): return staticmethod(function) if function is not None else None @classmethod - def for_key_value(cls, key, value, sortkey=None): + def for_key_value( + cls, key: str, value: str, sortkey: Callable[..., Any] | None = None + ) -> type["_Association"]: """Create association class for assoctype.""" name = f"{key.title()}{value.title()}Association" @@ -222,12 +226,14 @@ class AssociationsMap( Inherit list to be json serialized to a list. """ - def __init__(self, assoctype, resource, sortkey=None): + def __init__( + self, assoctype: str, resource: str, sortkey: Callable[..., Any] | None = None + ) -> None: # pylint:disable=super-init-not-called list.__init__(self) self.assoc_class = _Association.for_key_value(assoctype, resource, sortkey) - self._map = {} + self._map: dict[Any, Any] = {} def __getitem__(self, key): return self._map[key].value @@ -244,7 +250,7 @@ def __setitem__(self, key, value): except KeyError: self._add(key, value) - def extendvalues(self, key, values): + def extendvalues(self, key: Any, values: Any) -> Any: """Extend values for `key`.""" self.setdefault(key, []).extend(values) return self[key] @@ -261,7 +267,13 @@ def _add(self, key, value): list.sort(self, key=lambda x: x.key) @classmethod - def from_list(cls, assoclist, assoctype, resource, sortkey=None): + def from_list( + cls, + assoclist: list[Any] | None, + assoctype: str, + resource: str, + sortkey: Callable[..., Any] | None = None, + ) -> "AssociationsMap | None": """Create AssociationsMap from assoclist.""" if assoclist is None: return None @@ -306,7 +318,11 @@ def items(self): __setslice__ = property(_disabled_method) -def associationsmapdict_from_dict(assocsdict, resource, sortkey=None): +def associationsmapdict_from_dict( + assocsdict: dict[str, Any] | None, + resource: str, + sortkey: Callable[..., Any] | None = None, +) -> dict[str, AssociationsMap] | None: """Create a dict of AssociationsMap from `assocsdict` for `resource`.""" if assocsdict is None: return None diff --git a/iotlabcli/auth.py b/iotlabcli/auth.py index 70f64f1..bf03c98 100644 --- a/iotlabcli/auth.py +++ b/iotlabcli/auth.py @@ -22,17 +22,19 @@ """Authentication file management""" import getpass +import logging import os import os.path from base64 import b64decode, b64encode from iotlabcli.rest import Api -import logging RC_FILE = os.getenv("IOTLAB_PASSWORD_FILE") or os.path.expanduser("~/.iotlabrc") -def get_user_credentials(username=None, password=None): +def get_user_credentials( + username: str | None = None, password: str | None = None +) -> tuple[str | None, str | None]: """Return user credentials. If provided in arguments return them, if password missing, ask on console, or try to read them from password file""" @@ -46,13 +48,13 @@ def get_user_credentials(username=None, password=None): return username, password -def check_user_credentials(username, password): +def check_user_credentials(username: str, password: str) -> bool: """Check that the given credentials are valid""" api = Api(username, password) return api.check_credential() -def write_password_file(username, password): +def write_password_file(username: str, password: str) -> None: """Create a password file for basic authentication http when command-line option username and password are used We write .iotlabrc file in user home directory with format username:base64(password) @@ -69,14 +71,14 @@ def write_password_file(username, password): pass_file.write(f"{username}:{enc_password}") -def _read_password_file(): +def _read_password_file() -> tuple[str | None, str | None]: """Try to read password file (.iotlabrc) in user home directory when command-line option username and password are not used. If password file exist whe return username and password for basic auth http authentication """ if not os.path.exists(RC_FILE): - logging.info(f"Reading password file: no such file or directory: '{RC_FILE}'") + logging.info("Reading password file: no such file or directory: '%s'", RC_FILE) return None, None try: with open(RC_FILE, "r") as password_file: @@ -91,7 +93,7 @@ def _read_password_file(): IDENTITY_FILE = os.path.expanduser("~/.ssh/id_rsa") -def add_ssh_key(identity_file=None): +def add_ssh_key(identity_file: str | None = None) -> None: """Install ssh key into user's iot-lab account""" if identity_file is None: @@ -113,7 +115,7 @@ def add_ssh_key(identity_file=None): api.set_ssh_keys(keys_json) -def ssh_keys(): +def ssh_keys() -> None: """List ssh keys configured into user's iot-lab account""" api = Api(*get_user_credentials()) diff --git a/iotlabcli/experiment.py b/iotlabcli/experiment.py index fce22b7..0db30be 100644 --- a/iotlabcli/experiment.py +++ b/iotlabcli/experiment.py @@ -21,9 +21,14 @@ """Implement the 'experiment' requests""" +from __future__ import annotations + import json import time +from collections.abc import Callable, Sequence +from dataclasses import dataclass from os.path import basename +from typing import Any try: # pylint: disable=import-error,no-name-in-module @@ -47,14 +52,14 @@ def submit_experiment( # pylint:disable=too-many-arguments,too-many-positional-arguments - api, - name, - duration, - resources, - start_time=None, - print_json=False, - sites_assocs=None, -): + api: Any, + name: str | None, + duration: int, + resources: list[dict[str, Any]], + start_time: int | None = None, + print_json: bool = False, + sites_assocs: Sequence[SiteAssociationTuple] | None = None, +) -> Any: """Submit user experiment with JSON Encoder serialization object Experiment and firmware(s). If submission is accepted by scheduler OAR we print JSONObject response with id submission. @@ -97,7 +102,7 @@ def submit_experiment( # pylint:disable=too-many-arguments,too-many-positional- return api.submit_experiment(exp_files) -def stop_experiment(api, exp_id): +def stop_experiment(api: Any, exp_id: int) -> Any: """Stop user experiment submission. :param api: API Rest api object @@ -106,7 +111,7 @@ def stop_experiment(api, exp_id): return api.stop_experiment(exp_id) -def get_experiments_list(api, state, limit, offset): +def get_experiments_list(api: Any, state: str | None, limit: int, offset: int) -> Any: """Get the experiment list with the specific restriction: :param state: State of the experiment :param limit: maximum number of outputs @@ -116,7 +121,7 @@ def get_experiments_list(api, state, limit, offset): return api.get_experiments(state, limit, offset) -def get_experiment(api, exp_id, option=""): +def get_experiment(api: Any, exp_id: int, option: str = "") -> Any: """Get user experiment's description : :param api: API Rest api object @@ -138,7 +143,7 @@ def get_experiment(api, exp_id, option=""): return result -def get_active_experiments(api, running_only=True): +def get_active_experiments(api: Any, running_only: bool = True) -> dict[str, list[int]]: """Get active experiments with it's state. :param api: API Rest api object @@ -150,7 +155,9 @@ def get_active_experiments(api, running_only=True): return exp_by_states -def load_experiment(api, exp_desc_path, files_list=()): +def load_experiment( + api: Any, exp_desc_path: str, files_list: Sequence[str] = () +) -> Any: """Load and submit user experiment description with firmware(s) Firmwares and scripts required for experiment will be loaded from @@ -176,7 +183,7 @@ def load_experiment(api, exp_desc_path, files_list=()): return api.submit_experiment(exp_files) -def _files_with_filespath(files, filespath): +def _files_with_filespath(files: list[str], filespath: Sequence[str]) -> list[str]: """Return `files` updated with `filespath`. Return a `files` list with path taken from `filespath` if basename @@ -209,7 +216,9 @@ def _files_with_filespath(files, filespath): return sorted(updatedfiles) -def reload_experiment(api, exp_id, duration=None, start_time=None): +def reload_experiment( + api: Any, exp_id: int, duration: int | None = None, start_time: int | None = None +) -> Any: """Reload given experiment, duration and start_time can be adapted. :param api: API Rest api object @@ -229,7 +238,9 @@ def reload_experiment(api, exp_id, duration=None, start_time=None): return api.reload_experiment(exp_id, exp_json) -def info_experiment(api, list_id=False, site=None, **selections): +def info_experiment( + api: Any, list_id: bool = False, site: str | None = None, **selections: str +) -> Any: """Print testbed information for user experiment submission: * nodes description * nodes description in short mode @@ -243,7 +254,7 @@ def info_experiment(api, list_id=False, site=None, **selections): return api.get_nodes(list_id, site, **selections) -def script_experiment(api, exp_id, command, *options): +def script_experiment(api: Any, exp_id: int, command: str, *options: Any) -> Any: """Upload an run scripts on sites. :param api: API Rest api object @@ -266,7 +277,7 @@ def script_experiment(api, exp_id, command, *options): return res -def _script_run_files_dict(*site_associations): +def _script_run_files_dict(*site_associations: SiteAssociationTuple) -> Any: """Return script start files dict. Returns dict with format @@ -285,7 +296,8 @@ def _script_run_files_dict(*site_associations): # Save association and files associations = {} - for sites, assocs in site_associations: + for site_assoc in site_associations: + sites, assocs = site_assoc.sites, site_assoc.associations for assoctype, assocname in assocs.items(): _add_siteassoc_to_dict(associations, sites, assoctype, assocname) inserted_assocs = files_dict.add_files_from_dict( @@ -298,14 +310,16 @@ def _script_run_files_dict(*site_associations): return files_dict -def _add_siteassoc_to_dict(assocs, sites, assoctype, assocname): +def _add_siteassoc_to_dict( + assocs: dict[str, Any], sites: tuple[str, ...], assoctype: str, assocname: str +) -> None: """Add given site association to 'assocs' dict.""" name = site_association_name(assoctype, assocname) assoc = assocs.setdefault(assoctype, AssociationsMap(assoctype, "sites")) assoc.extendvalues(name, sites) -def _check_sites_uniq(*site_associations): +def _check_sites_uniq(*site_associations: SiteAssociationTuple) -> None: """Check that sites are uniq >>> _check_sites_uniq(site_association('grenoble', script='script'), @@ -325,13 +339,13 @@ def _check_sites_uniq(*site_associations): def wait_experiment( # pylint: disable=too-many-arguments,too-many-positional-arguments - api, - exp_id, - states="Running", - step=5, - timeout=WAIT_TIMEOUT_DEFAULT, - cancel_on_timeout=False, -): + api: Any, + exp_id: int, + states: str = "Running", + step: int = 5, + timeout: float = WAIT_TIMEOUT_DEFAULT, + cancel_on_timeout: bool = False, +) -> str | None: """Wait for the experiment to be in `states`. Also returns if Terminated or Error @@ -365,7 +379,7 @@ def _stop_function(): ) -def _states_from_str(states_str): +def _states_from_str(states_str: str) -> list[str]: """Return list of states from comma separated string. Also verify given states are valid. @@ -376,7 +390,9 @@ def _states_from_str(states_str): STOPPED_STATES = set(_states_from_str("Terminated,Error")) -def _raise_timeout_msg(exp_str, stop_fct, cancel_on_timeout): +def _raise_timeout_msg( + exp_str: str, stop_fct: Callable[[], None], cancel_on_timeout: bool +) -> None: msg = "Timeout reached" if cancel_on_timeout: msg += f", cancelling experiment {exp_str}" @@ -386,14 +402,14 @@ def _raise_timeout_msg(exp_str, stop_fct, cancel_on_timeout): def wait_state( # pylint: disable=too-many-arguments,too-many-positional-arguments - state_fct, - stop_fct, - exp_str, - states="Running", - step=5, - timeout=WAIT_TIMEOUT_DEFAULT, - cancel_on_timeout=False, -): + state_fct: Callable[[], str], + stop_fct: Callable[[], None], + exp_str: str, + states: str = "Running", + step: int = 5, + timeout: float = WAIT_TIMEOUT_DEFAULT, + cancel_on_timeout: bool = False, +) -> str | None: """Wait until `state_fct` returns a state in `states` and also Terminated or Error @@ -423,7 +439,7 @@ def wait_state( # pylint: disable=too-many-arguments,too-many-positional-argume return None # pragma: no cover -def _timeout(start_time, timeout): +def _timeout(start_time: float, timeout: float) -> bool: """Return if timeout is reached. :param start_time: initial time @@ -433,7 +449,12 @@ def _timeout(start_time, timeout): return time.time() > start_time + timeout -def exp_resources(nodes, firmware_path=None, profile_name=None, **associations): +def exp_resources( + nodes: list[str] | "AliasNodes", + firmware_path: str | None = None, + profile_name: str | None = None, + **associations: str, +) -> dict[str, Any]: """Create an experiment resources dict. :param nodes: a list of nodes url or a AliasNodes object @@ -460,12 +481,15 @@ def exp_resources(nodes, firmware_path=None, profile_name=None, **associations): return resources -SiteAssociationTuple = collections.namedtuple( - "SiteAssociationTuple", ["sites", "associations"] -) +@dataclass(frozen=True) +class SiteAssociationTuple: + """Holds the sites and keyword associations for a site association.""" + + sites: tuple + associations: dict -def site_association(*sites, **kwassociations): +def site_association(*sites: str, **kwassociations: str) -> SiteAssociationTuple: """Return a site_association tuple.""" if not sites: raise ValueError("No sites given") @@ -496,7 +520,14 @@ class AliasNodes: # pylint: disable=too-few-public-methods _alias = 0 # static count of current alias number - def __init__(self, nbnodes, site, archi, mobile=False, _alias=None): + def __init__( + self, + nbnodes: int, + site: str, + archi: str, + mobile: bool = False, + _alias: str | None = None, + ) -> None: """ { "alias":"1", @@ -517,7 +548,7 @@ def __init__(self, nbnodes, site, archi, mobile=False, _alias=None): } @classmethod - def _alias_uid(cls, alias=None): + def _alias_uid(cls, alias: str | int | None = None) -> str: """Return an unique uid string. if alias is given, return it as a String @@ -551,7 +582,9 @@ class _Experiment: # pylint:disable=too-many-instance-attributes ASSOCATTR_FMT = "{}associations" - def __init__(self, name, duration, start_time=None): + def __init__( + self, name: str | None, duration: int, start_time: int | None = None + ) -> None: self.duration = duration self.reservation = start_time self.name = name @@ -630,7 +663,7 @@ def _load_assocs( ) self.siteassociations = associationsmapdict_from_dict(siteassociations, "sites") - def _set_type(self, exp_type): + def _set_type(self, exp_type: str) -> None: """Set current experiment type. If type was already set and is different ValueError is raised """ @@ -707,6 +740,7 @@ def set_alias_nodes(self, alias_nodes): @property def _register_nodes(self): """Register nodes with the correct method according to exp `type`.""" + assert self.type is not None _register_fct_dict = { "physical": self.set_physical_nodes, "alias": self.set_alias_nodes, @@ -727,7 +761,7 @@ def filenames(self): return files -def setattr_if_none(obj, attr, default): +def setattr_if_none(obj: Any, attr: str, default: Any) -> Any: """Set attribute as `default` if None :returns: attribute value after update @@ -739,13 +773,13 @@ def setattr_if_none(obj, attr, default): return getattr(obj, attr) -def _write_experiment_archive(exp_id, data): +def _write_experiment_archive(exp_id: int, data: bytes) -> None: """Write experiment archive contained in 'data' to 'exp_id.tar.gz'""" with open(f"{exp_id}.tar.gz", "wb") as archive: archive.write(data) -def nodes_association_name(assoctype, assocname): +def nodes_association_name(assoctype: str, assocname: str) -> str: """Adapt assocname depending of assoctype. Return basename(assocname) if assoctype is a file-association. @@ -753,7 +787,7 @@ def nodes_association_name(assoctype, assocname): return _basename_if_in(assocname, assoctype, NODES_ASSOCIATIONS_FILE_ASSOCS) -def site_association_name(assoctype, assocname): +def site_association_name(assoctype: str, assocname: str) -> str: """Adapt assocname depending on assoctype. * Return basename(assocname) if assoctype is a file-association. @@ -761,7 +795,12 @@ def site_association_name(assoctype, assocname): return _basename_if_in(assocname, assoctype, SITE_ASSOCIATIONS_FILE_ASSOCS) -def _basename_if_in(value, key, container, transform=basename): +def _basename_if_in( + value: str, + key: str, + container: tuple[str, ...], + transform: Callable[[str], str] = basename, +) -> str: """Return basename if in. >>> _basename_if_in('a/b', 1, [1]) diff --git a/iotlabcli/helpers.py b/iotlabcli/helpers.py index c15c3cb..c16b325 100644 --- a/iotlabcli/helpers.py +++ b/iotlabcli/helpers.py @@ -27,6 +27,8 @@ import os import sys import warnings +from collections.abc import Callable, Iterable +from typing import Any, cast OAR_STATES = [ "Waiting", @@ -47,7 +49,9 @@ ) -def get_current_experiment(api, experiment_id=None, running_only=True): +def get_current_experiment( + api: Any, experiment_id: int | None = None, running_only: bool = True +) -> int: """Return the given experiment or get the currently running one. If running_only is false, try to return the experiment the most advanced Waiting < toLaunch < Launching < Running""" @@ -68,7 +72,7 @@ def get_current_experiment(api, experiment_id=None, running_only=True): return exp_id -def exps_by_states_dict(api, states): +def exps_by_states_dict(api: Any, states: list[str]) -> dict[str, list[int]]: """Return current experiment in `states` as a per state dict""" # exps == [{'state': 'Waiting', 'id': 10134, ...}, @@ -83,7 +87,7 @@ def exps_by_states_dict(api, states): return exp_states_d # {'Waiting': [10134, 10135], 'Running': [10130]} -def get_current_exp(exp_by_states, states): # noqa: C901 +def get_current_exp(exp_by_states: dict[str, list[int]], states: list[str]) -> int: # noqa: C901 """Current experiment is the first state in `states` where there is only one experiment in `exp_by_states`. :raises: ValueError if there is no experiment or if there are multiple @@ -135,7 +139,7 @@ def get_current_exp(exp_by_states, states): # noqa: C901 return res -def node_url_sort_key(node_url): +def node_url_sort_key(node_url: str) -> int | tuple[str, str, int]: """ >>> node_url_sort_key("m3-2.grenoble.iot-lab.info") ('grenoble', 'm3', 2) @@ -164,7 +168,7 @@ def node_url_sort_key(node_url): return site, node_type, int(num_str) -def md5(data): +def md5(data: bytes) -> str: """calculate the md5 hash of the file""" hash_md5 = hashlib.md5() hash_md5.update(data) @@ -177,17 +181,17 @@ class FilesDict(dict): so __setitem__ is overriden to check that """ - def __init__(self): + def __init__(self) -> None: dict.__init__(self) - def __setitem__(self, key, val): + def __setitem__(self, key: str, val: str | bytes) -> None: """Prevent adding a new different value to an existing key""" if key not in self: dict.__setitem__(self, key, val) elif self[key] != val: raise ValueError(f"Has different values for same key {key!r}") - def add_file(self, file_path): + def add_file(self, file_path: str | None) -> str | None: """Add a file to the dictionary. :param file_path the path of the file to add :returns the id of the file in the dict @@ -198,7 +202,7 @@ def add_file(self, file_path): if file_path is None: return None key = os.path.basename(file_path) - value = read_file(file_path, "b") + value = cast(bytes, read_file(file_path, "b")) try: self[key] = value except ValueError: @@ -209,7 +213,9 @@ def add_file(self, file_path): return key - def add_files_from_dict(self, keys, files_dict): + def add_files_from_dict( + self, keys: tuple[str, ...] | list[str], files_dict: dict[str, Any] + ) -> dict[str, str]: """Add 'keys' files from 'files_dict' if present. :param keys: which keys to consider inside the input files dict :param files_dict: @@ -225,14 +231,14 @@ def add_files_from_dict(self, keys, files_dict): add_firmware = add_file # Deprecated -def read_custom_api_url(): +def read_custom_api_url() -> str | None: """Return the customized api url from: * config file in /.iotlab.api-url * or environment variable IOTLAB_API_URL """ try: # try getting url from config file - api_url = read_file("~/.iotlab.api-url").strip() + api_url = cast(str, read_file("~/.iotlab.api-url")).strip() except IOError: # try getting url from environment variable, None if undefined api_url = os.getenv("IOTLAB_API_URL") @@ -242,13 +248,13 @@ def read_custom_api_url(): return api_url -def read_file(file_path, opt=""): +def read_file(file_path: str, opt: str = "") -> str | bytes: """Open and read a file""" with open(os.path.expanduser(file_path), "r" + opt) as _fd: # expand '~' return _fd.read() -def check_experiment_state(state_str=None): +def check_experiment_state(state_str: str | None = None) -> str: """Check that given states are valid if None given, return all states >>> check_experiment_state('Running') @@ -275,7 +281,7 @@ def check_experiment_state(state_str=None): return state_str -def json_dumps(obj): +def json_dumps(obj: Any) -> str: """Dumps data to json""" class _Encoder(json.JSONEncoder): # pylint: disable=too-few-public-methods @@ -287,7 +293,7 @@ def default(self, o): # pylint: disable=method-hidden return json.dumps(obj, cls=_Encoder, sort_keys=True, indent=4) -def flatten_list_list(list_list): +def flatten_list_list(list_list: Iterable[Iterable[Any]]) -> list[Any]: """Flatten given list of list. >>> flatten_list_list([[1, 2, 3], [4], [5], [6, 7, 8]]) @@ -296,7 +302,7 @@ def flatten_list_list(list_list): return list(itertools.chain.from_iterable(list_list)) -def deprecate_warn_cmd(old_cmd, new_cmd, stacklevel): +def deprecate_warn_cmd(old_cmd: str, new_cmd: str, stacklevel: int) -> None: """Display a deprecation warning message""" warnings.simplefilter("always", DeprecationWarning) warnings.warn( @@ -306,7 +312,7 @@ def deprecate_warn_cmd(old_cmd, new_cmd, stacklevel): ) -def deprecate_cmd(cmd_func, old_cmd, new_cmd): +def deprecate_cmd(cmd_func: Callable[[], None], old_cmd: str, new_cmd: str) -> None: """Display a deprecation warning message and run command.""" deprecate_warn_cmd(old_cmd, new_cmd, 4) cmd_func() diff --git a/iotlabcli/node.py b/iotlabcli/node.py index ea67870..13c3b02 100644 --- a/iotlabcli/node.py +++ b/iotlabcli/node.py @@ -22,6 +22,7 @@ """Implement the 'node' requests""" import json +from typing import Any from iotlabcli import helpers @@ -29,7 +30,9 @@ EXPERIMENT = "experiment.json" -def _node_command_flash(api, exp_id, nodes_list, cmd_opt): +def _node_command_flash( + api: Any, exp_id: int, nodes_list: list[str], cmd_opt: str +) -> Any: assert cmd_opt is not None, "`cmd_opt` required for update" files = helpers.FilesDict() @@ -42,7 +45,9 @@ def _node_command_flash(api, exp_id, nodes_list, cmd_opt): return api.node_update(exp_id, files) -def _node_command_profile_load(api, exp_id, nodes_list, cmd_opt): +def _node_command_profile_load( + api: Any, exp_id: int, nodes_list: list[str], cmd_opt: str +) -> Any: assert cmd_opt is not None, "`cmd_opt` required for update" files = helpers.FilesDict() @@ -51,7 +56,13 @@ def _node_command_profile_load(api, exp_id, nodes_list, cmd_opt): return api.node_profile_load(exp_id, files) -def node_command(api, command, exp_id, nodes_list=(), cmd_opt=None): +def node_command( + api: Any, + command: str, + exp_id: int, + nodes_list: list[str] | tuple[()] = (), + cmd_opt: str | None = None, +) -> Any: """Launch commands (start, stop, reset, update) on nodes (JSONArray) user experiment diff --git a/iotlabcli/parser/auth.py b/iotlabcli/parser/auth.py index a29a05d..3b5be33 100644 --- a/iotlabcli/parser/auth.py +++ b/iotlabcli/parser/auth.py @@ -24,7 +24,8 @@ import argparse import getpass import sys -from argparse import RawTextHelpFormatter +from argparse import ArgumentParser, RawTextHelpFormatter +from typing import Any from iotlabcli import auth from iotlabcli.parser import common @@ -38,7 +39,7 @@ """ -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-auth command-line options with argparse""" parent_parser = common.base_parser() # We create top level parser @@ -73,7 +74,7 @@ def parse_options(): return parser -def auth_parse_and_run(opts): # noqa: C901 +def auth_parse_and_run(opts: argparse.Namespace) -> Any: # noqa: C901 """Parse namespace 'opts' object and execute requested command :returns: result object """ @@ -103,7 +104,7 @@ def auth_parse_and_run(opts): # noqa: C901 raise RuntimeError("Wrong login:password") -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop." """ args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/parser/common.py b/iotlabcli/parser/common.py index 9d886bb..1ef9ee5 100644 --- a/iotlabcli/parser/common.py +++ b/iotlabcli/parser/common.py @@ -25,7 +25,10 @@ import contextlib import errno import sys +from argparse import ArgumentParser from collections import OrderedDict +from collections.abc import Callable, Generator +from typing import Any # pylint: disable=wrong-import-order try: @@ -43,7 +46,7 @@ DOMAIN_DNS = "iot-lab.info" -def base_parser(user_required=False): +def base_parser(user_required: bool = False) -> ArgumentParser: """Base parser giving 'user' 'password' and 'version' arguments :param user_required: set 'user' argument as required or not""" parser = argparse.ArgumentParser(add_help=False) @@ -54,21 +57,21 @@ def base_parser(user_required=False): return parser -def add_auth_arguments(parser, usr_required=False): +def add_auth_arguments(parser: ArgumentParser, usr_required: bool = False) -> None: """Add 'user' and 'password' arguments :param user_required: set 'user' argument as required or not""" parser.add_argument("-u", "--user", dest="username", required=usr_required) parser.add_argument("-p", "--password", dest="password") -def add_version(parser): +def add_version(parser: ArgumentParser) -> None: """Add 'version' argument""" parser.add_argument( "-v", "--version", action="version", version=iotlabcli.__version__ ) -def add_output_formatter(parser): +def add_output_formatter(parser: ArgumentParser) -> None: """Add '--jmespath' argument""" group = parser.add_argument_group("Output Format") group.add_argument( @@ -85,7 +88,7 @@ def add_output_formatter(parser): ) -def add_expid_arg(parser, required=False): +def add_expid_arg(parser: ArgumentParser, required: bool = False) -> None: """Add '-i' / '--id' for 'experiment_id' option.""" parser.add_argument( "-i", @@ -102,12 +105,18 @@ class HelpAction(argparse.Action): HELPMSG = None - def __call__(self, parser, namespace, values, option_string=None): + def __call__( + self, + parser: ArgumentParser, + namespace: argparse.Namespace, + values: Any, + option_string: str | None = None, + ) -> None: print(self.HELPMSG, end="") parser.exit() @classmethod - def for_message(cls, msg): + def for_message(cls, msg: str) -> type["HelpAction"]: """Create action for help message.""" class HelpActionWithMessage(cls): @@ -118,7 +127,9 @@ class HelpActionWithMessage(cls): return HelpActionWithMessage @classmethod - def add_help(cls, parser, name, description, msg): + def add_help( + cls, parser: ArgumentParser, name: str, description: str, msg: str + ) -> None: """Method to add a custom help option. :param parser: parser object to add to @@ -130,7 +141,11 @@ def add_help(cls, parser, name, description, msg): parser.add_argument(name, action=action, nargs=0, help=description) -def print_result(result, jmespath_expr=None, format_function=None): # noqa: C901 +def print_result( # noqa: C901 + result: Any, + jmespath_expr: Any = None, + format_function: Callable[[Any], str] | None = None, +) -> None: """Print result vule""" format_function = format_function or helpers.json_dumps @@ -155,7 +170,7 @@ def print_result(result, jmespath_expr=None, format_function=None): # noqa: C90 @contextlib.contextmanager -def catch_missing_auth_cli(): +def catch_missing_auth_cli() -> Generator[None, None, None]: """Catch HTTPError 401 and display a message on missing iotlab-auth.""" auth_cli_err = ( @@ -171,7 +186,11 @@ def catch_missing_auth_cli(): sys.exit(1) -def main_cli(function, parser, args=None): # noqa: C901 +def main_cli( # noqa: C901 + function: Callable[[argparse.Namespace], Any], + parser: ArgumentParser, + args: list[str] | None = None, +) -> None: """Main command-line execution.""" args = args or sys.argv[1:] try: @@ -194,13 +213,15 @@ def main_cli(function, parser, args=None): # noqa: C901 sys.exit(1) -def sites_list(): +def sites_list() -> list[str]: """Return the list of sites""" sites_dict = rest.Api.get_sites() return [site["site"] for site in sites_dict["items"]] -def check_site_with_server(site_name, _sites_list=None): +def check_site_with_server( + site_name: str, _sites_list: list[str] | None = None +) -> None: """Check if the given site exists by requesting the server list. If sites_list is given, it is used instead of doing a remote request @@ -217,13 +238,13 @@ def check_site_with_server(site_name, _sites_list=None): raise argparse.ArgumentTypeError(f"Unknown site name {site_name!r}") -def site_with_domain_checked(site, domain=DOMAIN_DNS): +def site_with_domain_checked(site: str, domain: str = DOMAIN_DNS) -> str: """Return site with domain and check site exists.""" check_site_with_server(site) return f"{site}.{domain}" -def nodes_list_from_info(site, archi, nodes_str): +def nodes_list_from_info(site: str, archi: str, nodes_str: str) -> list[str]: """ Cheks archi, nodes_str format and return nodes list >>> nodes_list_from_info('grenoble', 'm3', '1-4+6+7-8') @@ -248,7 +269,7 @@ def nodes_list_from_info(site, archi, nodes_str): return nodes_url_list -def nodes_id_list(archi, nodes_list): +def nodes_id_list(archi: str, nodes_list: str) -> list[str]: """Expand short nodes_list 'archi', '1-5+6+8-12' to a regular nodes list """ @@ -261,7 +282,7 @@ def nodes_id_list(archi, nodes_list): return nodes -def _expand_minus_str(minus_nodes_str): +def _expand_minus_str(minus_nodes_str: str) -> list[int] | range: """Expand a '1-5' or '6' string to a list on integer :raises: ValueError on invalid values """ @@ -283,7 +304,7 @@ def _expand_minus_str(minus_nodes_str): return res -def expand_short_nodes_list(nodes_str): +def expand_short_nodes_list(nodes_str: str) -> list[int]: """Expand short nodes_list '1-5+6+8-12' to a regular nodes list >>> expand_short_nodes_list('1-4+6+7-8') @@ -319,7 +340,7 @@ def expand_short_nodes_list(nodes_str): raise ValueError(f"Invalid nodes list: {nodes_str} ([0-9+-])") -def add_nodes_selection_list(parser): +def add_nodes_selection_list(parser: ArgumentParser) -> None: """Add '-l' and '-e' experiment nodes selection""" list_group = parser.add_mutually_exclusive_group() @@ -341,7 +362,12 @@ def add_nodes_selection_list(parser): ) -def list_nodes(api, exp_id, nodes_ll=None, excl_nodes_ll=None): +def list_nodes( + api: Any, + exp_id: int, + nodes_ll: list[list[str]] | None = None, + excl_nodes_ll: list[list[str]] | None = None, +) -> list[str]: """Return the list of nodes where the command will apply""" if nodes_ll is not None: @@ -361,14 +387,14 @@ def list_nodes(api, exp_id, nodes_ll=None, excl_nodes_ll=None): return sorted(nodes, key=helpers.node_url_sort_key) -def _get_experiment_nodes_list(api, exp_id): +def _get_experiment_nodes_list(api: Any, exp_id: int) -> list[str]: """Get the nodes_list for given experiment""" exp_nodes = api.get_experiment_info(exp_id, "nodes") nodes = [res["network_address"] for res in exp_nodes["items"]] return nodes -def nodes_list_from_str(nodes_list_str): +def nodes_list_from_str(nodes_list_str: str) -> list[str]: """Convert the nodes_list_str to a list of nodes hostname Checks that given site exist :param nodes_list_str: short nodes format: site_name,archi,node_id_list diff --git a/iotlabcli/parser/experiment.py b/iotlabcli/parser/experiment.py index 538e941..a540540 100644 --- a/iotlabcli/parser/experiment.py +++ b/iotlabcli/parser/experiment.py @@ -26,6 +26,7 @@ import time from argparse import ArgumentParser, RawTextHelpFormatter from datetime import datetime +from typing import Any from iotlabcli import auth, experiment, helpers, rest from iotlabcli.parser import common, help_msgs @@ -37,7 +38,7 @@ """ -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-experiment command-line options with argparse""" parent_parser = common.base_parser() @@ -277,7 +278,7 @@ def add_usage(self, usage, actions, groups, prefix=None): return parser -def parser_add_submit_subparser(subparsers): +def parser_add_submit_subparser(subparsers: Any) -> None: """Add 'submit' subparser and return it.""" submit_parser = subparsers.add_parser( "submit", @@ -336,8 +337,8 @@ def parser_add_submit_subparser(subparsers): def _parser_add_duration_and_reservation( # pylint:disable=invalid-name - subparser, duration_required -): + subparser: Any, duration_required: bool +) -> None: """Add a 'duration' and a 'reservation' argument to subparser. :param subparser: subparser instance @@ -359,7 +360,9 @@ def _parser_add_duration_and_reservation( # pylint:disable=invalid-name ) -def parser_add_wait_subparser(subparsers, expid_required=False): +def parser_add_wait_subparser( + subparsers: Any, expid_required: bool = False +) -> ArgumentParser: """Add wait experiment subparser and return it.""" wait_parser = subparsers.add_parser( "wait", @@ -393,7 +396,7 @@ def parser_add_wait_subparser(subparsers, expid_required=False): return wait_parser -def parser_add_script_subparser(subparsers): +def parser_add_script_subparser(subparsers: Any) -> ArgumentParser: """Add suparser for 'script'.""" _script_parser = subparsers.add_parser( "script", @@ -433,7 +436,7 @@ def parser_add_script_subparser(subparsers): return _script_parser -def exp_infos_from_str(exp_str): +def exp_infos_from_str(exp_str: str) -> tuple[Any, dict[str, str]]: """Extract nodes and associations.""" try: params = exp_str.split(",") @@ -447,7 +450,7 @@ def exp_infos_from_str(exp_str): return nodes, associations -def exp_resources_from_str(exp_str): +def exp_resources_from_str(exp_str: str) -> dict[str, Any]: """Extract an 'experiment.exp_resources' from parameter string. Accepted formats: @@ -462,7 +465,7 @@ def exp_resources_from_str(exp_str): return experiment.exp_resources(nodes, firmware_path, profile_name, **associations) -def site_association_from_str(site_assoc_str): +def site_association_from_str(site_assoc_str: str) -> Any: """Extract site_association from given string. Format is: @@ -489,13 +492,13 @@ def site_association_from_str(site_assoc_str): RUN_SITE_ASSOCIATION_METAVAR = f"site,site,{RUN_SITE_ASSOCIATIONS_STR}" -def _run_associations_arg_check(script, scriptconfig=None): +def _run_associations_arg_check(script: str, scriptconfig: str | None = None) -> None: # pylint:disable=unused-argument,unnecessary-pass """To be used with **associations to check given arguments.""" pass -def run_site_association_from_str(site_assoc_str): +def run_site_association_from_str(site_assoc_str: str) -> Any: """Extract site_association and verify given associations. 'script' association is mandatory. @@ -515,7 +518,7 @@ def run_site_association_from_str(site_assoc_str): return site_association -def _valid_param(param): +def _valid_param(param: str) -> None: """Check parameter are valid for _args_kwargs. * no space @@ -527,7 +530,7 @@ def _valid_param(param): raise ValueError(f"name required for kwarg '{param}'") -def _args_kwargs(params): +def _args_kwargs(params: list[str]) -> tuple[list[str], dict[str, str]]: """Separate args and kwargs from params. `args` must all be at first and `kwargs` at the end @@ -589,7 +592,7 @@ def _args_kwargs(params): return args, kwargs -def _check_args_then_kwargs(params): +def _check_args_then_kwargs(params: list[str]) -> None: """Check that args are first, and then kwargs only.""" is_kwargs = ["=" in param for param in params] # Should be many False then many True @@ -597,7 +600,7 @@ def _check_args_then_kwargs(params): raise ValueError("got argument after keyword argument") -def _add_key_value(kwargs, key, value=""): +def _add_key_value(kwargs: dict[str, str], key: str, value: str = "") -> None: """Add `key`,`value` if value is not empty. Raise an error if key exists. @@ -609,7 +612,7 @@ def _add_key_value(kwargs, key, value=""): kwargs[key] = value -def _submit_args_to_dict(firmware="", profile=""): +def _submit_args_to_dict(firmware: str = "", profile: str = "") -> dict[str, str]: """Return kwargs for this arguments. Remove empty values""" kwargs = {} if firmware: @@ -619,7 +622,9 @@ def _submit_args_to_dict(firmware="", profile=""): return kwargs -def _merge_assocs_args_d_kwargs(args_dict, kwargs): +def _merge_assocs_args_d_kwargs( + args_dict: dict[str, str], kwargs: dict[str, str] +) -> dict[str, str]: """Merge args_dict and kwargs. Detect duplicate keys.""" error_str = 'Association "%s" provided by argument and keyword argument' @@ -636,7 +641,7 @@ def _merge_assocs_args_d_kwargs(args_dict, kwargs): SUBMIT_ASSOC_ARGS_KWARGS = "[firmware][,profile][,assoc=value][,assoc=...]" -def _extract_associations(params): +def _extract_associations(params: list[str]) -> dict[str, str]: """Extract 'associations'. Firmware, profile at positional args then keyword arguments. @@ -654,7 +659,7 @@ def _extract_associations(params): return associations -def get_alias_properties(properties_str): +def get_alias_properties(properties_str: str) -> tuple[str, str, str | None]: """Extract nodes selection properties from given properties_str >>> get_alias_properties("site=grenoble+archi=wsn430:cc1101+mobile=True") @@ -685,7 +690,9 @@ def get_alias_properties(properties_str): return site, archi, mobile -def _alias_properties_from_kwargs(site, archi, mobile=None): +def _alias_properties_from_kwargs( + site: str, archi: str, mobile: str | None = None +) -> tuple[str, str, str | None]: """To be used with **properties, checks given keys. Returns values. @@ -693,7 +700,7 @@ def _alias_properties_from_kwargs(site, archi, mobile=None): return site, archi, mobile -def _properties_str_to_dict(properties_str): +def _properties_str_to_dict(properties_str: str) -> dict[str, str]: """Extract a properties string to a dict: >>> _properties_str_to_dict('a=1+b=3') == {'a': '1', 'b': '3'} @@ -721,7 +728,7 @@ def _properties_str_to_dict(properties_str): return prop_dict -def mobile_from_mobile_str(mobile_str=None): +def mobile_from_mobile_str(mobile_str: str | None = None) -> bool: """Return the value to put in experiment json from mobile_str. >>> mobile_from_mobile_str(None) @@ -755,18 +762,18 @@ def mobile_from_mobile_str(mobile_str=None): raise ValueError("Invalid 'mobile' property: %r. Should be in 'true|false|0|1'") -def _mobile_str_true_false(mobile_str): +def _mobile_str_true_false(mobile_str: str) -> bool: """Try checking for 'true', 'false' in any case.""" mobile_str = mobile_str.title() # upper first letter return {"True": True, "False": False}[mobile_str] -def _mobile_str_as_bool(mobile_str): +def _mobile_str_as_bool(mobile_str: str) -> bool: """Try converting to an int-bool.""" return bool(int(mobile_str)) -def _extract_firmware_nodes_list(param_list): +def _extract_firmware_nodes_list(param_list: list[str]) -> tuple[Any, list[str]]: """ Extract a firmware nodes list from param_list param_list is modified by the function call @@ -796,7 +803,7 @@ def _extract_firmware_nodes_list(param_list): return nodes, param_list -def submit_experiment_parser(opts): +def submit_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' and execute requested 'submit' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -812,7 +819,7 @@ def submit_experiment_parser(opts): ) -def script_parser(opts): +def script_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' and execute requestes 'run' command.""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -823,7 +830,7 @@ def script_parser(opts): return experiment.script_experiment(api, exp_id, command, *options) -def _script_command_options(opts): +def _script_command_options(opts: argparse.Namespace) -> tuple[str, list[Any]]: """Extract `command` and `options` from argparse 'opts'.""" if opts.run_script_site is not None: command = "run" @@ -840,7 +847,7 @@ def _script_command_options(opts): return command, options -def stop_experiment_parser(opts): +def stop_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'stop' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -849,7 +856,7 @@ def stop_experiment_parser(opts): return experiment.stop_experiment(api, exp_id) -def get_experiment_parser(opts): +def get_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'get' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) @@ -867,7 +874,7 @@ def get_experiment_parser(opts): return experiment.get_experiment(api, exp_id, _deprecate_cmd(opts.get_cmd)) -def _deprecate_cmd(cmd): +def _deprecate_cmd(cmd: str) -> str: if cmd == "resources": new_cmd = "nodes" helpers.deprecate_warn_cmd(cmd, new_cmd, 8) @@ -878,7 +885,7 @@ def _deprecate_cmd(cmd): return cmd -def _get_experiment_attr(api, opts): +def _get_experiment_attr(api: Any, opts: argparse.Namespace) -> dict[str, Any]: """Return start_time or state experiment attribute with old api format""" assert opts.get_cmd in ( "state", @@ -897,7 +904,7 @@ def _get_experiment_attr(api, opts): return {"start_time": int(timestamp), "local_date": local_date} -def load_experiment_parser(opts): +def load_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'load' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) @@ -906,7 +913,7 @@ def load_experiment_parser(opts): return experiment.load_experiment(api, opts.path_file, files) -def reload_experiment_parser(opts): +def reload_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'reload' command.""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -915,7 +922,7 @@ def reload_experiment_parser(opts): ) -def info_experiment_parser(opts): +def info_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'info' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -925,7 +932,7 @@ def info_experiment_parser(opts): return experiment.info_experiment(api, opts.list_id, **selection) -def wait_experiment_parser(opts): +def wait_experiment_parser(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested 'wait' command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) @@ -940,7 +947,7 @@ def wait_experiment_parser(opts): ) -def experiment_parse_and_run(opts): +def experiment_parse_and_run(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested command Return result object """ @@ -958,7 +965,7 @@ def experiment_parse_and_run(opts): return command(opts) -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop.""" args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/parser/help_msgs.py b/iotlabcli/parser/help_msgs.py index 3d48022..720495a 100644 --- a/iotlabcli/parser/help_msgs.py +++ b/iotlabcli/parser/help_msgs.py @@ -27,7 +27,7 @@ HELP_DIR = os.path.join(CUR_DIR, "help") -def _read_help_file(name): +def _read_help_file(name: str) -> str: """Read help file.""" help_file = os.path.join(HELP_DIR, name) @@ -150,19 +150,6 @@ def _read_help_file(name): """ -ADD_EPILOG_WSN430 = """ - -Examples : - # Add a profile with consumption measure configuration - $ iotlab-profile addwsn430 -n consum -current -voltage -power -cfreq 5000 - - # Add a profile with radio rssi measures - $ iotlab-profile addwsn430 -n rssi_profile -rfreq 500 - - # Add a profile with sensor measures - $ iotlab-profile addwsn430 -n sensors -sfreq 1000 -temperature -luminosity -""" - ADD_EPILOG_M3_A8 = """ Examples : diff --git a/iotlabcli/parser/main.py b/iotlabcli/parser/main.py index 00f4473..5d49c14 100644 --- a/iotlabcli/parser/main.py +++ b/iotlabcli/parser/main.py @@ -23,6 +23,7 @@ import sys from argparse import ArgumentParser +from typing import Any import iotlabcli.parser.auth import iotlabcli.parser.experiment @@ -56,7 +57,7 @@ oml_plot_tools = None # pylint:disable=invalid-name -def parse_subcommands(commands, args): +def parse_subcommands(commands: dict[str, Any], args: list[str]) -> Any: """common function to parse `iotlab` or other with subcommands""" parser = ArgumentParser() @@ -69,7 +70,7 @@ def parse_subcommands(commands, args): return commands[opts.command](args[1:]) -def oml_plot(args): +def oml_plot(args: list[str]) -> None: """'iotlab oml-plot' main function.""" commands = { @@ -80,7 +81,7 @@ def oml_plot(args): parse_subcommands(commands, args) -def main(args=None): +def main(args: list[str] | None = None) -> Any: """'iotlab' main function.""" args = args or sys.argv[1:] diff --git a/iotlabcli/parser/node.py b/iotlabcli/parser/node.py index 3785b63..c6daf0f 100644 --- a/iotlabcli/parser/node.py +++ b/iotlabcli/parser/node.py @@ -23,7 +23,8 @@ import argparse import sys -from argparse import RawTextHelpFormatter +from argparse import ArgumentParser, RawTextHelpFormatter +from typing import Any import iotlabcli.node from iotlabcli import auth, helpers, rest @@ -54,7 +55,7 @@ """ -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-node command-line options with argparse""" parent_parser = common.base_parser() @@ -170,7 +171,7 @@ def parse_options(): return parser -def node_parse_and_run(opts): +def node_parse_and_run(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -189,7 +190,7 @@ def node_parse_and_run(opts): return iotlabcli.node.node_command(api, command, exp_id, nodes, cmd_opt) -def _deprecate_cmd(opts): +def _deprecate_cmd(opts: argparse.Namespace) -> None: if opts.command == "update-idle": new_cmd = "flash-idle" helpers.deprecate_warn_cmd(opts.command, new_cmd, 7) @@ -199,7 +200,7 @@ def _deprecate_cmd(opts): opts.firmware_path = opts.up_firmware_path -def _node_parse_command_and_opt(**opts_dict): +def _node_parse_command_and_opt(**opts_dict: Any) -> tuple[str, str]: """Return 'command' and 'command_opt' from **opts_dict. @@ -247,7 +248,7 @@ def _node_parse_command_and_opt(**opts_dict): raise ValueError("Unknown command") -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop." """ args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/parser/profile.py b/iotlabcli/parser/profile.py index 19b41e5..b0c2f26 100644 --- a/iotlabcli/parser/profile.py +++ b/iotlabcli/parser/profile.py @@ -24,11 +24,12 @@ import argparse import json import sys -from argparse import RawTextHelpFormatter +from argparse import ArgumentParser, RawTextHelpFormatter +from typing import Any from iotlabcli import auth, helpers, rest from iotlabcli.parser import common, help_msgs -from iotlabcli.profile import ProfileA8, ProfileCustom, ProfileM3, ProfileWSN430 +from iotlabcli.profile import ProfileA8, ProfileCustom, ProfileM3 PROFILE_PARSER = """ @@ -40,7 +41,7 @@ """ -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-profile command-line opts with argparse""" parent_parser = common.base_parser() # We create top level parser @@ -54,13 +55,6 @@ def parse_options(): subparsers = parser.add_subparsers(dest="command") subparsers.required = True # not required by default in Python3 - add_wsn430_parser = subparsers.add_parser( - "addwsn430", - help="add wsn430 user profile", - epilog=help_msgs.ADD_EPILOG_WSN430, - formatter_class=RawTextHelpFormatter, - ) - # # m3 profile # @@ -95,76 +89,6 @@ def parse_options(): get_parser = subparsers.add_parser("get", help="get user's profile") load_parser = subparsers.add_parser("load", help="load user profile") - # - # WSN430 profile - # - add_wsn430_parser.add_argument("-n", "--name", required=True, help="profile name") - add_wsn430_parser.add_argument( - "-j", - "--json", - action="store_true", - help="print profile JSON representation without add it", - ) - - add_wsn430_parser.add_argument( - "-p", - "--power", - dest="power_mode", - default="dc", - help="power mode (dc by default)", - choices=ProfileWSN430.choices["power_mode"], - ) - - # WSN430 Consumption - group_wsn430_consumption = add_wsn430_parser.add_argument_group( - "Consumption measure" - ) - - group_wsn430_consumption.add_argument( - "-cfreq", - dest="cfreq", - type=int, - choices=ProfileWSN430.choices["consumption"]["frequency"], - help="frequency measure (ms)", - ) - - group_wsn430_consumption.add_argument( - "-power", action="store_true", help="power measure" - ) - group_wsn430_consumption.add_argument( - "-voltage", action="store_true", help="voltage measure" - ) - group_wsn430_consumption.add_argument( - "-current", action="store_true", help="current measure" - ) - - # WSN430 Radio - group_wsn430_radio = add_wsn430_parser.add_argument_group("Radio measure") - group_wsn430_radio.add_argument( - "-rfreq", - dest="rfreq", - type=int, - choices=ProfileWSN430.choices["radio"]["frequency"], - help="frequency measure (ms)", - ) - - # WSN430 Sensor - group_wsn430_sensor = add_wsn430_parser.add_argument_group("Sensor measure") - group_wsn430_sensor.add_argument( - "-sfreq", - dest="sfreq", - type=int, - choices=ProfileWSN430.choices["sensor"]["frequency"], - help="frequency measure (ms)", - ) - - group_wsn430_sensor.add_argument( - "-temperature", action="store_true", help="temperature measure" - ) - group_wsn430_sensor.add_argument( - "-luminosity", action="store_true", help="luminosity measure" - ) - # Delete Profile del_parser.add_argument("-n", "--name", required=True, help="profile name") @@ -194,7 +118,7 @@ def parse_options(): return parser -def add_m3_a8_parser(node_type, subparser): +def add_m3_a8_parser(node_type: str, subparser: ArgumentParser) -> None: """Add options for m3 and a8 parsers as they are the same""" node_class = {"M3": ProfileM3, "A8": ProfileA8, "CUSTOM": ProfileCustom}[node_type] @@ -294,23 +218,7 @@ def add_m3_a8_parser(node_type, subparser): ) -def _wsn430_profile(opts): - """Create a wsn430 profile from namespace object""" - profile = ProfileWSN430(profilename=opts.name, power=opts.power_mode) - profile.set_consumption( - frequency=opts.cfreq, - power=opts.power, - voltage=opts.voltage, - current=opts.current, - ) - profile.set_radio(frequency=opts.rfreq) - profile.set_sensors( - frequency=opts.sfreq, temperature=opts.temperature, luminosity=opts.luminosity - ) - return profile - - -def _m3_a8_profile(opts, node_class): +def _m3_a8_profile(opts: argparse.Namespace, node_class: type) -> Any: """Create a node_class profile from namespace object""" profile = node_class(profilename=opts.name, power=opts.power_mode) profile.set_consumption( @@ -329,22 +237,22 @@ def _m3_a8_profile(opts, node_class): return profile -def _m3_profile(opts): +def _m3_profile(opts: argparse.Namespace) -> Any: """Create a m3 profile from namespace object""" return _m3_a8_profile(opts, ProfileM3) -def _a8_profile(opts): +def _a8_profile(opts: argparse.Namespace) -> Any: """Create a a8 profile from namespace object""" return _m3_a8_profile(opts, ProfileA8) -def _custom_profile(opts): +def _custom_profile(opts: argparse.Namespace) -> Any: """Create a a8 profile from namespace object""" return _m3_a8_profile(opts, ProfileCustom) -def _add_profile(api, profile, json_out=False): +def _add_profile(api: Any, profile: Any, json_out: bool = False) -> Any: """Add user profile. if json, dump json dict to stdout""" if json_out: return profile @@ -352,7 +260,7 @@ def _add_profile(api, profile, json_out=False): return api.add_profile(profile) -def add_profile_parser(api, opts): +def add_profile_parser(api: Any, opts: argparse.Namespace) -> Any: """Add user profile with JSON Encoder serialization object Profile. :param api: API Rest api object @@ -360,7 +268,6 @@ def add_profile_parser(api, opts): :type opts: Namespace object with opts attribute """ profile_func_d = { - "addwsn430": _wsn430_profile, "addm3": _m3_profile, "adda8": _a8_profile, "addcustom": _custom_profile, @@ -373,7 +280,7 @@ def add_profile_parser(api, opts): raise ValueError(str(err)) -def load_profile_parser(api, opts): +def load_profile_parser(api: Any, opts: argparse.Namespace) -> Any: """Load and add user profile description :param api: API Rest api object @@ -384,7 +291,7 @@ def load_profile_parser(api, opts): return _add_profile(api, profile) -def del_profile_parser(api, opts): +def del_profile_parser(api: Any, opts: argparse.Namespace) -> Any: """Delete user profile description :param api: API Rest api object @@ -394,7 +301,7 @@ def del_profile_parser(api, opts): return api.del_profile(opts.name) -def get_profile_parser(api, opts): +def get_profile_parser(api: Any, opts: argparse.Namespace) -> Any: """Get user profile description _ print JSONObject profile description _ print JSONObject profile's list description @@ -413,13 +320,12 @@ def get_profile_parser(api, opts): return profile_dict -def profile_parse_and_run(opts): +def profile_parse_and_run(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) fct_parser = { - "addwsn430": add_profile_parser, "addm3": add_profile_parser, "adda8": add_profile_parser, "addcustom": add_profile_parser, @@ -431,7 +337,7 @@ def profile_parse_and_run(opts): return fct_parser(api, opts) -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop." """ args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/parser/robot.py b/iotlabcli/parser/robot.py index fb4324c..8be865b 100644 --- a/iotlabcli/parser/robot.py +++ b/iotlabcli/parser/robot.py @@ -23,6 +23,8 @@ import argparse import sys +from argparse import ArgumentParser +from typing import Any import iotlabcli.robot from iotlabcli import auth, helpers, rest @@ -32,7 +34,7 @@ on a turtlebot.""" -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-robot command-line options with argparse""" parent_parser = common.base_parser() @@ -84,7 +86,7 @@ def parse_options(): return parser -def robot_parse_and_run(opts): # noqa # Too complex but straightforward +def robot_parse_and_run(opts: argparse.Namespace) -> Any: # noqa # Too complex but straightforward """Parse namespace 'opts' object and execute requested command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -111,7 +113,7 @@ def robot_parse_and_run(opts): # noqa # Too complex but straightforward return ret -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop." """ args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/parser/status.py b/iotlabcli/parser/status.py index 332d115..627b72a 100644 --- a/iotlabcli/parser/status.py +++ b/iotlabcli/parser/status.py @@ -23,7 +23,8 @@ import argparse import sys -from argparse import RawTextHelpFormatter +from argparse import ArgumentParser, RawTextHelpFormatter +from typing import Any import iotlabcli.status from iotlabcli import auth, rest @@ -51,7 +52,7 @@ """ -def parse_options(): +def parse_options() -> ArgumentParser: """Handle iotlab-status command-line options with argparse""" parent_parser = common.base_parser() @@ -128,7 +129,7 @@ def parse_options(): return parser -def status_parse_and_run(opts): +def status_parse_and_run(opts: argparse.Namespace) -> Any: """Parse namespace 'opts' object and execute requested command""" user, passwd = auth.get_user_credentials(opts.username, opts.password) api = rest.Api(user, passwd) @@ -136,7 +137,7 @@ def status_parse_and_run(opts): return iotlabcli.status.status_command(api, opts.command, **selection) -def main(args=None): +def main(args: list[str] | None = None) -> None: """Main command-line execution loop." """ args = args or sys.argv[1:] parser = parse_options() diff --git a/iotlabcli/profile.py b/iotlabcli/profile.py index 18e47de..1dbac84 100644 --- a/iotlabcli/profile.py +++ b/iotlabcli/profile.py @@ -21,9 +21,12 @@ """Class python for Profile serialization JSON""" +from dataclasses import dataclass, field + # pylint:disable=too-few-public-methods +@dataclass class ProfileM3A8: """A generic Profile for M3 and A8""" @@ -41,19 +44,27 @@ class ProfileM3A8: } arch = None - def __init__(self, profilename, power): - assert power in self.choices["power_mode"] + profilename: str + power: str + nodearch: str = field(init=False) + consumption: dict | None = field(init=False) + radio: dict | None = field(init=False) + + def __post_init__(self) -> None: + assert self.power in self.choices["power_mode"] assert self.arch is not None, "Using Generic class" self.nodearch = self.arch - self.profilename = profilename - self.power = power - self.consumption = None self.radio = None def set_consumption( # pylint: disable=too-many-arguments,too-many-positional-arguments - self, period, average, power=False, voltage=False, current=False - ): + self, + period: int | None, + average: int | None, + power: bool = False, + voltage: bool = False, + current: bool = False, + ) -> None: """Configure consumption measures""" if not power and not voltage and not current: return @@ -70,7 +81,13 @@ def set_consumption( # pylint: disable=too-many-arguments,too-many-positional-a "current": current, } - def set_radio(self, mode, channels, period=None, num_per_channel=None): + def set_radio( + self, + mode: str | None, + channels: list[int] | None, + period: int | None = None, + num_per_channel: int | None = None, + ) -> None: """Configure radio measures""" if not mode: return @@ -87,7 +104,12 @@ def set_radio(self, mode, channels, period=None, num_per_channel=None): } config_radio[mode](channels, period, num_per_channel) - def _cfg_radio_rssi(self, channels, period, num_per_channel=None): + def _cfg_radio_rssi( + self, + channels: list[int], + period: int, + num_per_channel: int | None = None, + ) -> None: """Check parameters for rssi measures and set config""" num_per_channel = num_per_channel or 0 @@ -106,7 +128,12 @@ def _cfg_radio_rssi(self, channels, period, num_per_channel=None): self.radio["period"] = period self.radio["num_per_channel"] = num_per_channel - def _cfg_radio_sniffer(self, channels, period=None, num_per_channel=None): + def _cfg_radio_sniffer( + self, + channels: list[int], + period: int | None = None, + num_per_channel: int | None = None, + ) -> None: """Check parameters for sniffer measures and set the configuration""" # 'Period' and multiple channels should be handled later when supported @@ -120,9 +147,6 @@ def _cfg_radio_sniffer(self, channels, period=None, num_per_channel=None): self.radio["period"] = None self.radio["num_per_channel"] = None - def __eq__(self, other): # pragma: no cover - return self.__dict__ == other.__dict__ - class ProfileM3(ProfileM3A8): """A Profile measure class for M3.""" @@ -140,66 +164,3 @@ class ProfileCustom(ProfileM3A8): """A Profile measure class for Custom.""" arch = "custom" - - -class ProfileWSN430: - """A Profile measure class for WSN430""" - - choices = { - "power_mode": ["dc", "battery"], - "consumption": {"frequency": [5000, 1000, 500, 100, 70]}, - "radio": {"frequency": [5000, 1000, 500]}, - "sensor": {"frequency": [30000, 10000, 5000, 1000]}, - } - - def __init__(self, profilename, power): - assert power in ProfileWSN430.choices["power_mode"] - self.nodearch = "wsn430" - self.profilename = profilename - self.power = power - - self.consumption = None - self.radio = None - self.sensor = None - - def set_consumption(self, frequency, power=False, voltage=False, current=False): - """Configure consumption measures""" - if not power and not voltage and not current: - return - _err = "Required 'frequency' for consumption measure" - assert frequency is not None, _err - - assert frequency in self.choices["consumption"]["frequency"] - self.consumption = { - "frequency": frequency, - "power": power, - "voltage": voltage, - "current": current, - } - - def set_radio(self, frequency): - """Configure radio measures""" - if not frequency: - return - assert frequency in self.choices["radio"]["frequency"] - self.radio = { - "frequency": frequency, - "rssi": True, - } - - def set_sensors(self, frequency, temperature=False, luminosity=False): - """Configure sensor measures""" - if not temperature and not luminosity: - return - _err = "Required 'frequency' for sensor measure" - assert frequency is not None, _err - - assert frequency in self.choices["sensor"]["frequency"] - self.sensor = { - "frequency": frequency, - "luminosity": luminosity, - "temperature": temperature, - } - - def __eq__(self, other): # pragma: no cover - return self.__dict__ == other.__dict__ diff --git a/iotlabcli/rest.py b/iotlabcli/rest.py index 467e8f3..3138bc0 100644 --- a/iotlabcli/rest.py +++ b/iotlabcli/rest.py @@ -29,6 +29,7 @@ """ import sys +from typing import Any import requests from requests.auth import HTTPBasicAuth @@ -75,7 +76,7 @@ class Api: # pylint:disable=too-many-public-methods _cache = {} url = helpers.read_custom_api_url() or "https://www.iot-lab.info/api/" - def __init__(self, username, password): + def __init__(self, username: str | None, password: str | None) -> None: """ :param username: username for Basic password auth :param password: password for Basic auth @@ -83,11 +84,13 @@ def __init__(self, username, password): """ self.auth = HTTPBasicAuth(username, password) - def get_sites_details(self): + def get_sites_details(self) -> Any: """Get testbed sites details""" return self.method("sites/details") - def get_nodes(self, list_id=False, site=None, **selections): + def get_nodes( + self, list_id: bool = False, site: str | None = None, **selections: str + ) -> Any: """Get testbed nodes description :param list_id: return result in 'exp_list' format '3-12+35' @@ -104,7 +107,7 @@ def get_nodes(self, list_id=False, site=None, **selections): url += "?" + urlencode(sorted(list(selections.items()))) return self.method(url) - def submit_experiment(self, files): + def submit_experiment(self, files: dict[str, Any]) -> Any: """Submit user experiment :param files: experiment description and firmware(s) @@ -113,18 +116,20 @@ def submit_experiment(self, files): """ return self.method("experiments", "post", files=files) - def get_experiments(self, state="Running", limit=0, offset=0): + def get_experiments( + self, state: str = "Running", limit: int = 0, offset: int = 0 + ) -> Any: """Get user's experiment :returns JSONObject """ queryset = f"state={state}&limit={limit}&offset={offset}" return self.method(f"experiments?{queryset}") - def get_running_experiments(self): + def get_running_experiments(self) -> Any: """Get testbed running experiments""" return self.method("experiments/running") - def get_experiment_info(self, expid, option=""): + def get_experiment_info(self, expid: int, option: str = "") -> Any: """Get user experiment description. :param expid: experiment id submission (e.g. OAR scheduler) :param option: Restrict to some values: @@ -141,14 +146,16 @@ def get_experiment_info(self, expid, option=""): url += f"/{option}" return self.method(url, raw=option == "data") - def stop_experiment(self, expid): + def stop_experiment(self, expid: int) -> Any: """Stop user experiment. :param id: experiment id submission (e.g. OAR scheduler) """ return self.method(f"experiments/{expid}", "delete") - def reload_experiment(self, expid, exp_json=None): + def reload_experiment( + self, expid: int, exp_json: dict[str, str] | None = None + ) -> Any: """Reload user experiment. :param expid: experiment id submission (e.g. OAR scheduler) @@ -160,7 +167,13 @@ def reload_experiment(self, expid, exp_json=None): # Node commands - def node_command(self, command, expid, nodes=(), option=None): + def node_command( + self, + command: str, + expid: int, + nodes: list[str] | tuple[()] = (), + option: str | None = None, + ) -> Any: """Lanch 'command' on user experiment list nodes :param id: experiment id submission (e.g. OAR scheduler) @@ -173,7 +186,9 @@ def node_command(self, command, expid, nodes=(), option=None): url += f"/{option}" return self.method(url, "post", json=nodes) - def node_update(self, expid, files, binary=False): + def node_update( + self, expid: int, files: dict[str, Any], binary: bool = False + ) -> Any: """Launch update command (flash firmware) on user experiment list nodes @@ -187,7 +202,7 @@ def node_update(self, expid, files, binary=False): url += "/binary" return self.method(url, "post", files=files) - def node_profile_load(self, expid, files): + def node_profile_load(self, expid: int, files: dict[str, Any]) -> Any: """Update profile with profile json on user experiment list nodes @@ -199,7 +214,13 @@ def node_profile_load(self, expid, files): return self.method(f"experiments/{expid}/nodes/monitoring", "post", files=files) # script - def script_command(self, expid, command, files=None, json=None): + def script_command( + self, + expid: int, + command: str, + files: dict[str, Any] | None = None, + json: Any = None, + ) -> Any: """Execute scripts on sites. :param expid: experiment id submission (e.g. OAR scheduler) @@ -219,7 +240,7 @@ def script_command(self, expid, command, files=None, json=None): # Profile methods - def get_profiles(self, archi=None): + def get_profiles(self, archi: str | None = None) -> Any: """Get user's list profile description :returns JSONObject @@ -229,7 +250,7 @@ def get_profiles(self, archi=None): url += f"?archi={archi}" return self.method(url) - def get_profile(self, name): + def get_profile(self, name: str) -> Any: """Get user profile description. :param name: profile name @@ -238,7 +259,7 @@ def get_profile(self, name): """ return self.method(f"monitoring/{name}") - def add_profile(self, profile): + def add_profile(self, profile: Any) -> Any: """Add user profile :param profile: profile description @@ -250,7 +271,7 @@ def add_profile(self, profile): ret = self.method("monitoring", "post", json=profile) return ret - def del_profile(self, name): + def del_profile(self, name: str) -> Any: """Delete user profile :param profile_name: name @@ -259,7 +280,7 @@ def del_profile(self, name): ret = self.method(f"monitoring/{name}", "delete") return ret - def check_credential(self): + def check_credential(self) -> bool: """Check that the credentials are valid""" try: self.method("user") @@ -271,18 +292,20 @@ def check_credential(self): # ssh keys api - def get_ssh_keys(self): + def get_ssh_keys(self) -> Any: """Get user's registered ssh keys""" ret = self.method("user/keys") return ret - def set_ssh_keys(self, ssh_keys_json): + def set_ssh_keys(self, ssh_keys_json: Any) -> None: """Set user's ssh keys""" self.method("user/keys", "post", json=ssh_keys_json, raw=True) # robot - def robot_command(self, command, expid, nodes=()): + def robot_command( + self, command: str, expid: int, nodes: list[str] | tuple[()] = () + ) -> Any: """Run 'status' on user experiment robot list nodes. :param id: experiment id submission (e.g. OAR scheduler) @@ -291,7 +314,9 @@ def robot_command(self, command, expid, nodes=()): assert command in ("status",) return self.method(f"experiments/{expid}/robots/{command}", "post", json=nodes) - def robot_update_mobility(self, expid, name, nodes=()): + def robot_update_mobility( + self, expid: int, name: str, nodes: list[str] | tuple[()] = () + ) -> Any: """Update mobility on user experiment robot list nodes. :param id: experiment id submission (e.g. OAR scheduler) @@ -301,7 +326,7 @@ def robot_update_mobility(self, expid, name, nodes=()): return self.method(url, "post", json=nodes) @classmethod - def get_robot_mapfile(cls, site, mapfile): + def get_robot_mapfile(cls, site: str, mapfile: str) -> Any: """Download robot mapfile. :params site: Map info for site @@ -315,7 +340,7 @@ def get_robot_mapfile(cls, site, mapfile): url = f"robots/{site}/{mapfile}" return api.method(url, raw=raw) - def get_circuits(self, **selections): + def get_circuits(self, **selections: str) -> Any: """List circuits mobilities.""" url = "mobilities/circuits" if selections: @@ -324,18 +349,18 @@ def get_circuits(self, **selections): url += "?" + urlencode(sorted(list(selections.items()))) return self.method(url) - def get_circuit(self, name): + def get_circuit(self, name: str) -> Any: """Get user mobilities.""" return self.method(f"mobilities/circuits/{name}") def method( # pylint:disable=too-many-arguments,too-many-positional-arguments self, - url, - method="get", - json=None, - files=None, - raw=False, - ): + url: str, + method: str = "get", + json: Any = None, + files: dict[str, Any] | None = None, + raw: bool = False, + ) -> Any: """Call http `method` on iot-lab-url/'url'. :param url: url of API. @@ -357,7 +382,7 @@ def method( # pylint:disable=too-many-arguments,too-many-positional-arguments return self._raise_http_error(_url, req) @staticmethod - def _request(url, method, **kwargs): + def _request(url: str, method: str, **kwargs: Any) -> requests.Response: """Call http `method` on 'url' :param url: url of API. @@ -369,7 +394,7 @@ def _request(url, method, **kwargs): raise RuntimeError(sys.exc_info()) @staticmethod - def _raise_http_error(url, req): + def _raise_http_error(url: str, req: requests.Response) -> None: """Raises HTTP error for 'url' and 'req'""" # Indent req.text to pretty print it later indented_lines = ["\t" + line for line in req.text.splitlines(True)] @@ -377,7 +402,7 @@ def _raise_http_error(url, req): raise HTTPError(url, req.status_code, msg, req.headers, None) @classmethod - def get_sites(cls): + def get_sites(cls) -> Any: """Get testbed sites description May be run unauthicated @@ -386,7 +411,7 @@ def get_sites(cls): return cls._get_with_cache("sites") @classmethod - def _get_with_cache(cls, url): + def _get_with_cache(cls, url: str) -> Any: """Get resource from either cache or rest :returns JSONObject """ diff --git a/iotlabcli/robot.py b/iotlabcli/robot.py index 72365b3..ab3d773 100644 --- a/iotlabcli/robot.py +++ b/iotlabcli/robot.py @@ -21,10 +21,14 @@ """Implement the 'robot' requests""" +from typing import Any + from iotlabcli.rest import Api -def robot_command(api, command, exp_id, nodes_list=()): +def robot_command( + api: Api, command: str, exp_id: int, nodes_list: list[str] | tuple[()] = () +) -> Any: """Launch commands ('status',) on nodes_list :param api: API Rest api object @@ -38,7 +42,9 @@ def robot_command(api, command, exp_id, nodes_list=()): return result -def robot_update_mobility(api, exp_id, name, nodes_list=()): +def robot_update_mobility( + api: Api, exp_id: int, name: str, nodes_list: list[str] | tuple[()] = () +) -> Any: """Update robot mobility on nodes_list. :param api: API Rest api object @@ -51,7 +57,9 @@ def robot_update_mobility(api, exp_id, name, nodes_list=()): return result -def circuit_command(api, command, name=None, **selection): +def circuit_command( + api: Api, command: str, name: str | None = None, **selection: str +) -> Any: """Run mobilities circuit commands. :param command: in ['list', 'get'] @@ -76,7 +84,7 @@ def circuit_command(api, command, name=None, **selection): return result -def robot_get_map(site): +def robot_get_map(site: str) -> dict[str, Any]: """Download all robot map files Download robot site config, map and docks list""" diff --git a/iotlabcli/status.py b/iotlabcli/status.py index 56bca18..f2b2c67 100644 --- a/iotlabcli/status.py +++ b/iotlabcli/status.py @@ -21,8 +21,10 @@ """Implement the 'status' requests""" +from typing import Any -def status_command(api, command, **selections): + +def status_command(api: Any, command: str, **selections: str) -> Any: """Launch testbed status commands :param api: API Rest api object diff --git a/iotlabcli/tests/experiment_test.py b/iotlabcli/tests/experiment_test.py index f8819df..9192b7a 100644 --- a/iotlabcli/tests/experiment_test.py +++ b/iotlabcli/tests/experiment_test.py @@ -510,7 +510,10 @@ def test_site_assoctiation(self): """Test working site associations.""" # One site / assoc assocs = experiment.site_association("grenoble", script="script.sh") - self.assertEqual(assocs, (("grenoble",), {"script": "script.sh"})) + self.assertEqual( + assocs, + experiment.SiteAssociationTuple(("grenoble",), {"script": "script.sh"}), + ) # Multiple sites / asocs assocs = experiment.site_association( @@ -518,7 +521,9 @@ def test_site_assoctiation(self): ) self.assertEqual( assocs, - (("grenoble", "strasbourg"), {"script": "script.sh", "ipv6": "2001::"}), + experiment.SiteAssociationTuple( + ("grenoble", "strasbourg"), {"script": "script.sh", "ipv6": "2001::"} + ), ) self.assertEqual(assocs.sites, ("grenoble", "strasbourg")) self.assertEqual(assocs.associations, {"script": "script.sh", "ipv6": "2001::"}) diff --git a/iotlabcli/tests/profile_parser_test.py b/iotlabcli/tests/profile_parser_test.py index 75fa70c..3991191 100644 --- a/iotlabcli/tests/profile_parser_test.py +++ b/iotlabcli/tests/profile_parser_test.py @@ -33,12 +33,6 @@ class TestMainProfileParser(MainMock): def test_main_add_parser(self): - # add simple add - profile_parser.main(["addwsn430", "-n", "profile_name", "-p", "dc"]) - self.api.add_profile.assert_called_with( - iotlabcli.profile.ProfileWSN430("profile_name", "dc") - ) - profile_parser.main(["addm3", "-n", "profile_name", "-p", "dc"]) self.api.add_profile.assert_called_with( iotlabcli.profile.ProfileM3("profile_name", "dc") @@ -107,42 +101,6 @@ def test_opts_parsing_m3(prof_m3_class): mode="sniffer", channels=[11], period=None, num_per_channel=None ) - @staticmethod - @patch("iotlabcli.parser.profile.ProfileWSN430") - def test_opts_parsing_wsn430(prof_wsn430_class): - """Test that WSN430profile parsing matches profile class - Check that default values are ok and that values are correctly passed - """ - # keep 'choices' valid for argparse - prof_wsn430_class.choices = iotlabcli.profile.ProfileWSN430.choices - profilewsn430 = prof_wsn430_class.return_value - parser = profile_parser.parse_options() - - # pylint: disable=protected-access - args = ["addwsn430", "-n", "name", "-p", "dc"] - opts = parser.parse_args(args) - profile_parser._wsn430_profile(opts) - profilewsn430.set_consumption.assert_called_with( - frequency=None, power=False, voltage=False, current=False - ) - profilewsn430.set_radio.assert_called_with(frequency=None) - profilewsn430.set_sensors.assert_called_with( - frequency=None, temperature=False, luminosity=False - ) - - args += ["-cfreq", "70", "-power", "-voltage", "-current"] - args += ["-rfreq", "500"] - args += ["-sfreq", "1000", "-luminosity", "-temperature"] - opts = parser.parse_args(args) - profile_parser._wsn430_profile(opts) - profilewsn430.set_consumption.assert_called_with( - frequency=70, power=True, voltage=True, current=True - ) - profilewsn430.set_radio.assert_called_with(frequency=500) - profilewsn430.set_sensors.assert_called_with( - frequency=1000, temperature=True, luminosity=True - ) - def test__add_profile(self): ret = profile_parser._add_profile( # pylint: disable=protected-access self.api, {"test_profile": 1}, json_out=True diff --git a/iotlabcli/tests/profile_test.py b/iotlabcli/tests/profile_test.py index d86a29b..76486e1 100644 --- a/iotlabcli/tests/profile_test.py +++ b/iotlabcli/tests/profile_test.py @@ -111,53 +111,3 @@ def test_valid_empty_profile(self): "radio": None, }, ) - - -class TestWSN430Profile(unittest.TestCase): - def test_valid_full_profile(self): - wsn430_prof = profile.ProfileWSN430("name", "dc") - wsn430_prof.set_consumption(5000, True, True, True) - wsn430_prof.set_radio(5000) - wsn430_prof.set_sensors(30000, True, True) - - self.assertEqual( - wsn430_prof.__dict__, - { - "profilename": "name", - "power": "dc", - "nodearch": "wsn430", - "consumption": { - "frequency": 5000, - "current": True, - "voltage": True, - "power": True, - }, - "radio": { - "frequency": 5000, - "rssi": True, - }, - "sensor": { - "frequency": 30000, - "luminosity": True, - "temperature": True, - }, - }, - ) - - def test_valid_empty_profile(self): - wsn430_prof = profile.ProfileWSN430("name", "dc") - wsn430_prof.set_consumption(None) - wsn430_prof.set_radio(frequency=None) - wsn430_prof.set_sensors(None) - - self.assertEqual( - wsn430_prof.__dict__, - { - "profilename": "name", - "power": "dc", - "nodearch": "wsn430", - "consumption": None, - "radio": None, - "sensor": None, - }, - )