diff --git a/src/evdev/device.py b/src/evdev/device.py index 73b0acb..878a937 100644 --- a/src/evdev/device.py +++ b/src/evdev/device.py @@ -1,6 +1,6 @@ import contextlib import os -from typing import NamedTuple, Tuple, Union +from typing import Dict, Iterator, List, Literal, NamedTuple, Tuple, Union, overload from . import _input, ecodes, util @@ -95,7 +95,7 @@ class DeviceInfo(NamedTuple): product: int version: int - def __str__(self): + def __str__(self) -> str: msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}" return msg.format(*self) # pylint: disable=not-an-iterable @@ -151,7 +151,7 @@ def __init__(self, dev: Union[str, bytes, os.PathLike]): #: The number of force feedback effects the device can keep in its memory. self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd) - def __del__(self): + def __del__(self) -> None: if hasattr(self, "fd") and self.fd is not None: try: self.close() @@ -176,7 +176,13 @@ def _capabilities(self, absinfo: bool = True): return res - def capabilities(self, verbose: bool = False, absinfo: bool = True): + @overload + def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]: + ... + @overload + def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]: + ... + def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]: """ Return the event types that this device supports as a mapping of supported event types to lists of handled event codes. @@ -263,7 +269,7 @@ def leds(self, verbose: bool = False): return leds - def set_led(self, led_num: int, value: int): + def set_led(self, led_num: int, value: int) -> None: """ Set the state of the selected LED. @@ -279,18 +285,18 @@ def __eq__(self, other): """ return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path - def __str__(self): + def __str__(self) -> str: msg = 'device {}, name "{}", phys "{}", uniq "{}"' return msg.format(self.path, self.name, self.phys, self.uniq or "") - def __repr__(self): + def __repr__(self) -> str: msg = (self.__class__.__name__, self.path) return "{}({!r})".format(*msg) def __fspath__(self): return self.path - def close(self): + def close(self) -> None: if self.fd > -1: try: super().close() @@ -298,7 +304,7 @@ def close(self): finally: self.fd = -1 - def grab(self): + def grab(self) -> None: """ Grab input device using ``EVIOCGRAB`` - other applications will be unable to receive events until the device is released. Only @@ -311,7 +317,7 @@ def grab(self): _input.ioctl_EVIOCGRAB(self.fd, 1) - def ungrab(self): + def ungrab(self) -> None: """ Release device if it has been already grabbed (uses `EVIOCGRAB`). @@ -324,7 +330,7 @@ def ungrab(self): _input.ioctl_EVIOCGRAB(self.fd, 0) @contextlib.contextmanager - def grab_context(self): + def grab_context(self) -> Iterator[None]: """ A context manager for the duration of which only the current process will be able to receive events from the device. @@ -342,7 +348,7 @@ def upload_effect(self, effect: "ff.Effect"): ff_id = _input.upload_effect(self.fd, data) return ff_id - def erase_effect(self, ff_id): + def erase_effect(self, ff_id) -> None: """ Erase a force effect from a force feedback device. This also stops the effect. @@ -402,7 +408,7 @@ def absinfo(self, axis_num: int): """ return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num)) - def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None): + def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None: """ Update :class:`AbsInfo` values. Only specified values will be overwritten. diff --git a/src/evdev/eventio_async.py b/src/evdev/eventio_async.py index fb8bcd2..4af1aab 100644 --- a/src/evdev/eventio_async.py +++ b/src/evdev/eventio_async.py @@ -1,11 +1,57 @@ import asyncio import select +import sys from . import eventio +from .events import InputEvent # needed for compatibility from .eventio import EvdevError +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing import Any as Self + + +class ReadIterator: + def __init__(self, device): + self.current_batch = iter(()) + self.device = device + + # Standard iterator protocol. + def __iter__(self) -> Self: + return self + + def __next__(self) -> InputEvent: + try: + # Read from the previous batch of events. + return next(self.current_batch) + except StopIteration: + r, w, x = select.select([self.device.fd], [], []) + self.current_batch = self.device.read() + return next(self.current_batch) + + def __aiter__(self) -> Self: + return self + + def __anext__(self) -> "asyncio.Future[InputEvent]": + future = asyncio.Future() + try: + # Read from the previous batch of events. + future.set_result(next(self.current_batch)) + except StopIteration: + + def next_batch_ready(batch): + try: + self.current_batch = batch.result() + future.set_result(next(self.current_batch)) + except Exception as e: + future.set_exception(e) + + self.device.async_read().add_done_callback(next_batch_ready) + return future + class EventIO(eventio.EventIO): def _do_when_readable(self, callback): @@ -42,7 +88,7 @@ def async_read(self): self._do_when_readable(lambda: self._set_result(future, self.read)) return future - def async_read_loop(self): + def async_read_loop(self) -> ReadIterator: """ Return an iterator that yields input events. This iterator is compatible with the ``async for`` syntax. @@ -58,42 +104,3 @@ def close(self): # no event loop present, so there is nothing to # remove the reader from. Ignore pass - - -class ReadIterator: - def __init__(self, device): - self.current_batch = iter(()) - self.device = device - - # Standard iterator protocol. - def __iter__(self): - return self - - def __next__(self): - try: - # Read from the previous batch of events. - return next(self.current_batch) - except StopIteration: - r, w, x = select.select([self.device.fd], [], []) - self.current_batch = self.device.read() - return next(self.current_batch) - - def __aiter__(self): - return self - - def __anext__(self): - future = asyncio.Future() - try: - # Read from the previous batch of events. - future.set_result(next(self.current_batch)) - except StopIteration: - - def next_batch_ready(batch): - try: - self.current_batch = batch.result() - future.set_result(next(self.current_batch)) - except Exception as e: - future.set_exception(e) - - self.device.async_read().add_done_callback(next_batch_ready) - return future diff --git a/src/evdev/util.py b/src/evdev/util.py index b84ef09..f873655 100644 --- a/src/evdev/util.py +++ b/src/evdev/util.py @@ -6,7 +6,7 @@ from typing import Union, List from . import ecodes -from .events import event_factory +from .events import InputEvent, event_factory def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]: @@ -32,7 +32,7 @@ def is_device(fn: Union[str, bytes, os.PathLike]) -> bool: return True -def categorize(event): +def categorize(event: InputEvent) -> InputEvent: """ Categorize an event according to its type.