diff --git a/GUI/controls.py b/GUI/controls.py index f711a41..e3770db 100644 --- a/GUI/controls.py +++ b/GUI/controls.py @@ -1,8 +1,5 @@ -from dm40.types import ThemePalette - - class UIControls: - def __init__(self, master, style, theme: ThemePalette): + def __init__(self, master, style, theme): self.master = master self.style = style self.theme = theme @@ -11,7 +8,7 @@ def __init__(self, master, style, theme: ThemePalette): self._init_layouts() self.apply_theme() - def use_theme(self, theme: ThemePalette) -> None: + def use_theme(self, theme) -> None: self.theme = theme self.apply_theme() diff --git a/GUI/theme_manager.py b/GUI/theme_manager.py index ed00cc7..92a986a 100644 --- a/GUI/theme_manager.py +++ b/GUI/theme_manager.py @@ -2,8 +2,7 @@ import tkinter as tk from tkinter import ttk -from dm40.types import ThemePalette -from dm40.theme_store import deserialize_theme_store_palettes +from shared.theme_store import deserialize_theme_store_palettes from GUI.widgets.helpers import theme_title_bar from GUI.widgets.themed_button import ThemedButton @@ -34,7 +33,7 @@ def __init__( self.style = style self._on_apply = on_apply - self._themes: list[ThemePalette] = deserialize_theme_store_palettes(_DEFAULT_STORE) + self._themes = deserialize_theme_store_palettes(_DEFAULT_STORE) self._active_theme_idx = self._read_active_index() self._dialog = None @@ -64,7 +63,7 @@ def list_theme_names(self) -> list[str]: def get_active_theme_index(self) -> int: return self._active_theme_idx - def get_active_theme(self) -> ThemePalette: + def get_active_theme(self): return self._themes[self._active_theme_idx] def activate_theme_index(self, theme_idx: int): @@ -234,7 +233,7 @@ def _apply_selected_theme(self): self._select_listbox_index(self._active_theme_idx) self._update_preview_from_selection() - def _activate_by_index(self, idx: int) -> tuple[ThemePalette | None, bool]: + def _activate_by_index(self, idx: int): if idx < 0 or idx >= len(self._themes): return None, False if self._active_theme_idx == idx: @@ -246,7 +245,7 @@ def _activate_by_index(self, idx: int) -> tuple[ThemePalette | None, bool]: self._update_listbox_active(old_idx) return self._themes[idx], True - def _apply_dialog_chrome(self, theme: ThemePalette): + def _apply_dialog_chrome(self, theme): dialog = self._dialog if not dialog or not dialog.winfo_exists(): return @@ -261,7 +260,7 @@ def _init_preview_styles(self): self.style.layout(_PREVIEW_BORDER, self.style.layout("Border.TFrame")) self.style.configure(_PREVIEW_BORDER, relief="solid") - def _apply_preview_colors(self, theme: ThemePalette): + def _apply_preview_colors(self, theme): self.style.configure( _PREVIEW_FRAME, background=theme.bg, diff --git a/GUI/themed_messagebox.py b/GUI/themed_messagebox.py index e442295..cf95d4f 100644 --- a/GUI/themed_messagebox.py +++ b/GUI/themed_messagebox.py @@ -137,7 +137,7 @@ def _center(self, parent): h = target_h if target_h else self.winfo_height() x = parent_x + (parent_w - w) // 2 y = parent_y + (parent_h - h) // 2 - self.geometry(f"{w}x{h}+{x}+{y}") + self.geometry("%dx%d+%d+%d" % (w, h, x, y)) def _finish(self, value): self._result = value diff --git a/GUI/widgets/find_popup.py b/GUI/widgets/find_popup.py index 3e8c48e..089edb0 100644 --- a/GUI/widgets/find_popup.py +++ b/GUI/widgets/find_popup.py @@ -1,7 +1,7 @@ import tkinter as tk from tkinter import ttk -from dm40.types import ThemePalette + class FindPopup: @@ -15,7 +15,7 @@ def __init__( self, parent: tk.Misc, text: tk.Text, - colors: ThemePalette, + colors, *, grid_opts: dict | None = None, ): @@ -69,7 +69,7 @@ def __init__( self.set_tag_colors(self._colors) self.hide(clear=False) - def set_tag_colors(self, colors: ThemePalette) -> None: + def set_tag_colors(self, colors) -> None: self._colors = colors text_fg = colors.text match_bg = colors.outline diff --git a/GUI/widgets/helpers.py b/GUI/widgets/helpers.py index e231fa1..e2f54da 100644 --- a/GUI/widgets/helpers.py +++ b/GUI/widgets/helpers.py @@ -29,11 +29,6 @@ class int32(ctypes._SimpleCData): _DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2 = intptr_t(-4) -def _hex_to_rgb(value): - b = bytes.fromhex(value[1:]) - return b[0], b[1], b[2] - - def _set_window_attribute(hwnd, attribute, value): if not hwnd: return False @@ -45,9 +40,8 @@ def _set_window_attribute(hwnd, attribute, value): return True -def _colorref_from_hex(value): - r, g, b = _hex_to_rgb(value) - return (b << 16) | (g << 8) | r +def _colorref_from_hex(value, _bf=bytes.fromhex, _ifb=int.from_bytes): + return _ifb(_bf(value[1:]), 'little') def theme_title_bar(window: tk.Tk | tk.Toplevel, *, border_color: str | None = None, diff --git a/GUI/widgets/tooltip.py b/GUI/widgets/tooltip.py index e48aa34..fd1e997 100644 --- a/GUI/widgets/tooltip.py +++ b/GUI/widgets/tooltip.py @@ -85,7 +85,7 @@ def _create_tip(self, text: str, x: int, y: int): ) self._label.pack(fill=tk.BOTH, expand=True) try: - self._tip.wm_geometry(f"+{x}+{y}") + self._tip.wm_geometry('+%s+%s' % (x, y)) except tk.TclError: pass self._current_text = text @@ -97,7 +97,7 @@ def move(self, x: int, y: int): if self._last_xy == xy: return try: - self._tip.wm_geometry(f"+{x}+{y}") + self._tip.wm_geometry('+%s+%s' % (x, y)) except tk.TclError: pass self._last_xy = xy diff --git a/GUI/widgets/waveform_view.py b/GUI/widgets/waveform_view.py index 0a39264..564af2b 100644 --- a/GUI/widgets/waveform_view.py +++ b/GUI/widgets/waveform_view.py @@ -4,7 +4,7 @@ import tkinter as tk from _collections import deque # type: ignore -from dm40.types import ThemePalette + from .tooltip import Tooltip @@ -13,7 +13,7 @@ class WaveformView(tk.Canvas): GRID_FRACS = (0.25, 0.5, 0.75) _DRAG_PX = 5 - def __init__(self, master: tk.Misc, *, colors: ThemePalette, capacity: int = 600): + def __init__(self, master: tk.Misc, *, colors, capacity: int = 600): super().__init__(master, highlightthickness=2, bd=0) self._cap = max(16, int(capacity)) self._buf: deque[float] = deque(maxlen=self._cap) @@ -272,7 +272,7 @@ def _dismiss(self, _e=None) -> None: # Theme - def set_colors(self, colors: ThemePalette) -> None: + def set_colors(self, colors) -> None: fg, trace, grid = colors.text, colors.accent, colors.outline self.itemconfigure(self._trace_line, fill=trace) self.itemconfigure(self._hover_line, fill=grid) @@ -369,14 +369,12 @@ def redraw(self) -> None: else: self._draw_sel(s, e) - # CSV save / record── - def save_buffer_csv(self, path: str) -> int: """Write current buffer to *path* as CSV. Returns row count.""" with open(path, "w", newline="") as f: f.write("Timestamp,Value\n") for ts, v in zip(self._ts, self._buf): - f.write(f"{ts},{v}\n") + f.write("%s,%s\n" % (ts, v)) return len(self._buf) def toggle_recording(self, path: str) -> bool: diff --git a/build_release.cmd b/build_release.cmd index f0b5f27..75f0103 100644 --- a/build_release.cmd +++ b/build_release.cmd @@ -2,7 +2,7 @@ setlocal EnableExtensions EnableDelayedExpansion set "DEFAULT_MODE_FLAGS=--deployment" -set "DEFAULT_CONSOLE_MODE=disable" +set "DEFAULT_CONSOLE_MODE=force" set "DEFAULT_MSVC=latest" set "DEFAULT_PYTHON=py -3.13" set "DEFAULT_OUT_DIR=build\ci\nuitka" diff --git a/dm40/app.py b/dm40/app.py index f90dcb2..67ec73a 100644 --- a/dm40/app.py +++ b/dm40/app.py @@ -1,27 +1,14 @@ -"""DM40 Tkinter application.""" - -import _thread -import time +"""DM40 device handler.""" import tkinter as tk from tkinter import ttk -from . import mini_asyncio as asyncio - -from dm40.types import ThemePalette -from GUI.controls import UIControls -from GUI.theme_manager import ThemeManager -from GUI.themed_messagebox import show_error -from GUI.widgets.autoscrollbar import AutoScrollbar -from GUI.widgets.find_popup import FindPopup -from GUI.widgets.helpers import ensure_dpi_awareness, theme_title_bar -from GUI.widgets.menubar import MENU_UP, MenuDropdown, OwnerDrawnMenuBar -from GUI.widgets.themed_button import ThemedButton -from GUI.widgets.waveform_view import WaveformView - -from .ble_worker import BleWorker -from .nanowinbt.scanner import NanoScanner -from .parsing import MODEL, Measurement, parse_device_status, parse_measurement_for_ui +from shared.ble_worker import BleWorker +from GUI.widgets.menubar import MENU_UP, MenuDropdown + +from .parsing import MODEL, MODEL_TABLE, Measurement, parse_device_status, parse_measurement_for_ui from .protocol_constants import ( + CMD_ID, + CMD_READ, COMMAND_CYCLE_GROUPS, COMMAND_KIND_LABELS, COMMAND_KIND_TO_GROUP, @@ -38,313 +25,133 @@ for _flag, (_kind_name, _rng) in FLAG_INFO.items(): _RANGE_ITEMS_BY_KIND.setdefault(_kind_name, []).append((_rng, _flag)) - -def _build_command_packet(cmd_prefix: bytes) -> bytes: - checksum = (-sum(cmd_prefix)) & 0xFF - return b"%b%c" % (cmd_prefix, checksum) +_MODEL_PREFIX = b"\xdf\x05\x03\x08\x14" -class DM40App(tk.Tk): - def __init__(self): - super().__init__() - ensure_dpi_awareness() - - self._start_time = time.monotonic() - self._title_base = "DM40" - self.title(self._title_base) - self.minsize(916, 650) - self.wm_geometry("916x650") +def _dm40_notify_filter(data: bytes) -> bool: + if data[:5] == _MODEL_PREFIX: + idx = data[9] - 0x41 + if 0 <= idx < len(MODEL_TABLE): + MODEL.model_name, MODEL.device_counts = MODEL_TABLE[idx] + return False + return True - self.style = ttk.Style(self) - self._theme_manager = ThemeManager(self, self.style, self._apply_theme) - initial_theme = self._theme_manager.get_active_theme() - self.ui = UIControls(self, self.style, theme=initial_theme) - theme_title_bar( - self, - border_color=initial_theme.outline, - caption_color=initial_theme.bg, - ) +class DM40Handler: + title = "DM40" + csv_prefix = "DM40" - self._worker: BleWorker | None = None + def __init__(self, app) -> None: + self.app = app self._last_trace_key: int | None = None self._last_device_status: tuple = (0, False, False, False) - self._devices: list = [] - self._device_index_by_address: dict[str, int] = {} - self._scan_in_progress = False - self._scan_generation = 0 - self._scan_cancel = None + self._last_measurement: Measurement | None = None self._mode_buttons: list[ttk.Button | ttk.Checkbutton] = [] self._range_button: ttk.Button | None = None self._range_menu: MenuDropdown | None = None self._toggle_vars: dict[str, tk.BooleanVar] = { - label: tk.BooleanVar(value=False) - for label in ("AUTO", "HOLD", "CAP") + label: tk.BooleanVar(value=False) for label in ("AUTO", "HOLD", "CAP") } self._cycle_groups: dict = {} self._last_base_mode_flag: int | None = None - self._last_measurement: Measurement | None = None - self._is_connected = False - - self._stats_count = 0 - self._stats_sum = 0.0 - self._stats_min = 0.0 - self._stats_max = 0.0 - - self._build_ui() - self.bind_all("", self._toggle_wave_pause) - self.bind_all("", self._save_wave_csv) - self.bind_all("", self._toggle_wave_record) - self.bind_all("", self._copy_reading) - - def _toggle_wave_pause(self, _event=None) -> None: - self._wave_view.toggle_pause() - self._refresh_status_bar() - - def _copy_reading(self, _event=None) -> None: - source = _event.widget if _event is not None else self.focus_get() - if isinstance(source, (tk.Text, tk.Entry, ttk.Entry)): - return - - m = self._last_measurement - if not m or m.kind == "---": - return - unit = f" {m.display_unit}" if m.display_unit else "" - self.clipboard_clear() - self.clipboard_append(f"{m.value_str}{unit}") - - _CSV_DIR = __compiled__.containing_dir if '__compiled__' in globals() else __file__.rsplit('\\', 2)[0] # type: ignore[name-defined] - - def _csv_path(self, prefix: str) -> str: - return f"{self._CSV_DIR}\\{prefix}_{time.strftime('%Y%m%d_%H%M%S')}.csv" - - def _save_wave_csv(self, _event=None) -> None: - self._wave_view.save_buffer_csv(self._csv_path("DM40")) - def _toggle_wave_record(self, _event=None) -> None: - if self._wave_view.recording: - self._wave_view.stop_recording() - else: - self._wave_view.toggle_recording(self._csv_path("DM40_rec")) - self._refresh_status_bar() - - def _clear_capture_data(self) -> None: - self._wave_view.clear() - self._raw_text.configure(state="normal") - self._raw_text.delete("1.0", "end") - self._raw_text.configure(state="disabled") - self._last_trace_key = None - self._stats_count = 0 - self._stats_sum = 0.0 - self._stats_min = 0.0 - self._stats_max = 0.0 - self._stats_var.set("") - self._refresh_status_bar() - - def _build_ui(self) -> None: - self._menu_bar = OwnerDrawnMenuBar( - self, - menus=[ - ( - "File", - [ - ("Save Buffer", self._save_wave_csv), - ("Record", self._toggle_wave_record), - "separator", - ("Clear", self._clear_capture_data), - "separator", - ("Exit", self._on_close), - ], - ), - ("Themes", []), - ], - theme_manager=self._theme_manager, - on_theme=self._apply_theme, + def create_worker(self, device, **callbacks): + return BleWorker( + device, + poll_cmd=CMD_READ, + init_cmd=CMD_ID, + notify_hook=_dm40_notify_filter, + write_buf_size=7, + **callbacks, ) - self._menu_bar.pack(fill=tk.X) - self._menu_bar.grid_columnconfigure(2, weight=1) - - root = tk.Frame(self, padx=12, pady=12) - root.pack(fill=tk.BOTH, expand=True) - root.columnconfigure(0, weight=1) + def build_menubar_labels(self, pre: tk.Frame, post: tk.Frame) -> None: self._model_var = tk.StringVar(value=MODEL.model_name) tk.Label( - self._menu_bar, textvariable=self._model_var, font=("Segoe UI", 11, "bold") - ).grid(row=0, column=3, sticky="e") - self._runtime_var = tk.StringVar(value="") - tk.Label(self._menu_bar, textvariable=self._runtime_var).grid( - row=0, column=4, sticky="w", padx=(0, 0) - ) + pre, textvariable=self._model_var, font=("Segoe UI", 11, "bold") + ).pack(side="left") self._icons_var = tk.StringVar(value="") - tk.Label(self._menu_bar, textvariable=self._icons_var).grid( - row=0, column=5, sticky="e", padx=(0, 0) - ) - self._battery_label = tk.Label( - self._menu_bar, - text="", - font=("Consolas", 10), - ) - self._battery_label.grid(row=0, column=6, sticky="e", padx=(0, 8)) - - self._status_var = tk.StringVar(value="Disconnected") - mid = tk.Frame(root) - mid.grid(row=1, column=0, sticky="nsew") - mid.columnconfigure(0, weight=1) - root.rowconfigure(1, weight=3) + tk.Label(post, textvariable=self._icons_var).pack(side="left") + self._battery_label = tk.Label(post, text="", font=("Consolas", 10)) + self._battery_label.pack(side="left", padx=(0, 8)) - reading = tk.Frame(mid) - reading.grid(row=0, column=0, sticky="ew") - reading.columnconfigure(0, weight=1) + def build_reading_area(self, parent: tk.Frame) -> None: + parent.columnconfigure(0, weight=0) + parent.columnconfigure(1, weight=1) self._mode_var = tk.StringVar(value="---") tk.Label( - reading, textvariable=self._mode_var, font=("Segoe UI", 12, "bold") + parent, textvariable=self._mode_var, font=("Segoe UI", 12, "bold") ).grid(row=0, column=0, sticky="w") self._value_label = ttk.Label( - reading, + parent, text="---", font=("Cascadia Mono", 44, "bold"), style="DM40.BigValue.TLabel", width=12, anchor="e", ) - self._value_label.grid(row=1, column=0, sticky="w") - self._aux_var = tk.StringVar(value="") - tk.Label(reading, textvariable=self._aux_var, font=("Segoe UI", 11)).grid( - row=2, column=0, columnspan=2, sticky="w" - ) - - self._wave_view = WaveformView(mid, colors=self.ui.theme, capacity=600) - self._wave_view.grid(row=1, column=0, sticky="nsew", pady=(10, 0)) - mid.rowconfigure(1, weight=1) - - self._stats_var = tk.StringVar(value="") - tk.Label(mid, textvariable=self._stats_var, font=("Consolas", 10), - anchor="w").grid(row=2, column=0, sticky="ew", pady=(0, 10)) - - raw_frame = tk.Frame(root) - raw_frame.grid(row=2, column=0, sticky="nsew") - raw_frame.columnconfigure(0, weight=1) - raw_frame.rowconfigure(1, weight=1) - root.rowconfigure(2, weight=1) - - tk.Label(raw_frame, text="Raw packets:").grid(row=0, column=0, sticky="w") - - self._raw_text = tk.Text( - raw_frame, - wrap="none", - height=10, - relief="flat", - highlightthickness=2, - undo=False, - maxundo=0, - autoseparators=False - ) - self._raw_text.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) - - yscroll = AutoScrollbar( - raw_frame, - orient="vertical", - command=self._raw_text.yview, - style="Arrowless.Vertical.TScrollbar", - ) - yscroll.grid(row=1, column=1, sticky="ns", pady=(6, 0)) - self._raw_text.configure(yscrollcommand=yscroll.set) - self._raw_text.configure(state="disabled") - - device_panel = tk.Frame(raw_frame) - device_panel.grid(row=0, column=2, rowspan=2, sticky="nsw", padx=(12, 0)) - device_panel.rowconfigure(1, weight=1) - device_panel.columnconfigure(0, weight=0) - device_panel.columnconfigure(1, weight=1) - device_panel.columnconfigure(2, weight=0) - - tk.Label(device_panel, text="Devices:").grid(row=0, column=0, sticky="w") - status_label = tk.Label(device_panel, textvariable=self._status_var) - status_label.grid(row=0, column=1, columnspan=2, sticky="e") - self._device_listbox = tk.Listbox( - device_panel, - height=10, - width=58, - exportselection=False, - activestyle="none", - highlightthickness=2, - relief="flat", - ) - - def on_click(event): - index = self._device_listbox.nearest(event.y) - bbox = self._device_listbox.bbox(index) - if bbox is None or event.y > bbox[1] + bbox[3]: - return "break" - - self._device_listbox.bind("", on_click) - - self._device_listbox.grid( - row=1, column=0, columnspan=3, sticky="nsew", pady=(6, 6) - ) - scan_btn = ThemedButton(device_panel, text="Scan", command=self.scan_devices) - scan_btn.grid(row=2, column=0, sticky="w") - self._connect_btn = ThemedButton( - device_panel, text="Connect", command=self.connect - ) - self._connect_btn.grid(row=2, column=1, sticky="e", padx=(0, 6)) - self._disconnect_btn = ThemedButton( - device_panel, text="Disconnect", command=self.disconnect - ) - self._disconnect_btn.grid(row=2, column=2, sticky="e") - - self._find = FindPopup( - raw_frame, - self._raw_text, - self.ui.theme, - grid_opts={ - "row": 1, - "column": 0, - "sticky": "ne", - "padx": 6, - "pady": (10, 0), - }, - ) - - bar = tk.Frame(root) - bar.grid(row=3, column=0, sticky="w", pady=(8, 0)) - root.rowconfigure(3, weight=0) + self._aux1_var = tk.StringVar(value="") + self._aux2_var = tk.StringVar(value="") + aux_frame = tk.Frame(parent) + aux_frame.grid(row=2, column=0, sticky="e") + tk.Label( + aux_frame, + textvariable=self._aux1_var, + font=("Cascadia Mono", 12, "bold"), + width=12, + anchor="e", + ).pack(side="right", padx=(6, 0)) + tk.Label( + aux_frame, + textvariable=self._aux2_var, + font=("Cascadia Mono", 12, "bold"), + width=12, + anchor="e", + ).pack(side="right") + def build_control_bar(self, bar: tk.Frame) -> None: self._range_button = ttk.Button( bar, text="Range", style="MenuBar.TButton", command=self._show_range_menu, padding=6, - width=0 + width=0, ) self._range_button.pack(side=tk.LEFT, padx=(0, 6)) for label, cmd in MOMENTARY_COMMANDS: - btn = ttk.Button(bar, text=label, style="MenuBar.TButton", - command=lambda c=cmd: self._send_command_prefix(c), padding=6, width=0) + btn = ttk.Button( + bar, text=label, style="MenuBar.TButton", + command=lambda c=cmd: self.app.send_command(c), padding=6, width=0, + ) btn.pack(side=tk.LEFT, padx=(0, 6)) self._mode_buttons.append(btn) for label in ("AUTO", "HOLD"): var = self._toggle_vars[label] - btn = ttk.Checkbutton(bar, text=label, style="MenuBar.TCheckbutton", - variable=var, command=lambda key=label: self._on_toggle_clicked(key)) + btn = ttk.Checkbutton( + bar, text=label, style="MenuBar.TCheckbutton", + variable=var, command=lambda key=label: self._on_toggle_clicked(key), + ) btn.pack(side=tk.LEFT, padx=(0, 6)) self._mode_buttons.append(btn) def add_cycle_group(kind: str, key: str, options: tuple) -> None: var = tk.StringVar(value=options[0][0]) sel_var = tk.BooleanVar(value=False) - self._cycle_groups[key] = {"kind": kind, "options": options, - "label_var": var, "select_var": sel_var} - btn = ttk.Checkbutton(bar, textvariable=var, style="MenuBar.TCheckbutton", - variable=sel_var, command=lambda k=key: self._cycle_mode(k)) + self._cycle_groups[key] = { + "kind": kind, "options": options, + "label_var": var, "select_var": sel_var, + } + btn = ttk.Checkbutton( + bar, textvariable=var, style="MenuBar.TCheckbutton", + variable=sel_var, command=lambda k=key: self._cycle_mode(k), + ) btn.pack(side=tk.LEFT, padx=(0, 6)) self._mode_buttons.append(btn) @@ -352,36 +159,99 @@ def add_cycle_group(kind: str, key: str, options: tuple) -> None: add_cycle_group("range", key, options) var = self._toggle_vars["CAP"] - btn = ttk.Checkbutton(bar, text="CAP", style="MenuBar.TCheckbutton", - variable=var, command=lambda: self._on_toggle_clicked("CAP")) + btn = ttk.Checkbutton( + bar, text="CAP", style="MenuBar.TCheckbutton", + variable=var, command=lambda: self._on_toggle_clicked("CAP"), + ) btn.pack(side=tk.LEFT, padx=(0, 6)) self._mode_buttons.append(btn) for key, options in COMMAND_CYCLE_GROUPS: add_cycle_group("command", key, options) - self._set_control_state(False) + self.set_control_state(False) - self.protocol("WM_DELETE_WINDOW", self._on_close) + self.app.bind_all("", self._copy_reading) - def _apply_theme(self, theme: ThemePalette) -> None: - self.ui.use_theme(theme) - theme_title_bar(self, border_color=theme.outline, caption_color=theme.bg) - - self._wave_view.set_colors(self.ui.theme) - self._find.set_tag_colors(self.ui.theme) - - def _set_control_state(self, enabled: bool) -> None: + def set_control_state(self, enabled: bool) -> None: state = "!disabled" if enabled else "disabled" for btn in self._mode_buttons: btn.state([state]) if self._range_button is not None: self._range_button.state([state]) - if not enabled and self._range_menu is not None: self._range_menu.destroy() self._range_menu = None + def pre_connect_reset(self) -> None: + self._last_trace_key = None + + def clear_capture(self) -> None: + self._last_trace_key = None + + def on_connected(self) -> None: + self._model_var.set(MODEL.model_name) + + def teardown(self) -> None: + self.app.unbind_all("") + + def refresh_status(self, now: float) -> None: + status = self._last_device_status + self._icons_var.set( + ("⚡" if status[1] else "") + + ("🔒" if status[2] else "") + + ("✋" if status[3] else "") + ) + charging = status[1] + target_fg = "#00ff00" if charging else self.app.option_get("foreground", ".") + if self._battery_label.cget("foreground") != target_fg: + self._battery_label.configure(foreground=target_fg) + segments = max(0, min(5, int(status[0]))) + self._battery_label.configure(text="█" * segments + "░" * (5 - segments)) + + def on_packet(self, data: bytes) -> None: + app = self.app + m = parse_measurement_for_ui(data) + app._append_raw_text(f"RX {m.raw} CRC:{'PASS' if m.crc_ok else 'FAIL'}\n") + + if m.kind == "---": + return + + self._last_device_status = parse_device_status(data) + self._last_measurement = m + self._apply_meter_state() + + trace_key = data[5] + if self._last_trace_key is not None and trace_key != self._last_trace_key: + app._wave_view.clear() + app._stats_count = 0 + self._last_trace_key = trace_key + + self._mode_var.set(f"{m.kind} {m.range}" if m.range else m.kind) + unit = f" {m.display_unit}" if m.display_unit else "" + self._value_label.configure(text=f" {m.value_str}{unit}") + + third = f"{m.third_val} {m.third_unit}".strip() if m.third_val else "" + sec = f"{m.sec_val} {m.sec_unit}".strip() if m.sec_val else "" + self._aux1_var.set(sec) + self._aux2_var.set(third) + + mul = UNIT_TO_BASE[m.display_unit] + if not m.overload and m.norm_value is not None: + app._wave_view.push( + m.norm_value, pad=m.vertical_pad, axis_unit=m.display_unit, + axis_mul=mul, decimals=m.decimals, + ) + smin, smax, avg = app._push_stats(m.norm_value) + d = m.decimals + app._stats_var.set( + "Min %.*f Max %.*f Avg %.*f%s" + % (d, smin / mul, d, smax / mul, d, avg / mul, unit) + ) + + if app._is_connected: + app._rate_count += 1 + def _apply_meter_state(self) -> None: m = self._last_measurement if not m: @@ -391,9 +261,8 @@ def _apply_meter_state(self) -> None: group["select_var"].set(False) return - status = self._last_device_status self._toggle_vars["AUTO"].set((m.range or "").startswith("AUTO")) - self._toggle_vars["HOLD"].set(status[3]) + self._toggle_vars["HOLD"].set(self._last_device_status[3]) self._toggle_vars["CAP"].set(m.kind == "CAP") for group in self._cycle_groups.values(): @@ -402,7 +271,6 @@ def _apply_meter_state(self) -> None: kind = m.kind if kind in RANGE_KIND_TO_GROUP: self._select_cycle_group(RANGE_KIND_TO_GROUP[kind], kind) - if kind in COMMAND_KIND_TO_GROUP: label = COMMAND_KIND_LABELS[kind] if kind in COMMAND_KIND_LABELS else kind self._select_cycle_group(COMMAND_KIND_TO_GROUP[kind], label) @@ -414,18 +282,6 @@ def _select_cycle_group(self, key: str, label: str) -> None: group["label_var"].set(label) group["select_var"].set(True) - def _send_command_prefix(self, cmd_prefix: bytes) -> None: - if not self._worker or not self._worker.alive: - show_error( - self, - "Command", - "Connect to a device before sending commands.", - theme=(self.ui.theme.bg, self.ui.theme.outline), - ) - return - payload = _build_command_packet(cmd_prefix) - self._worker.set_command(payload) - def _cycle_mode(self, key: str) -> None: group = self._cycle_groups[key] options = group["options"] @@ -440,34 +296,30 @@ def _cycle_mode(self, key: str) -> None: current_index = (current_index + 1) % len(options) value = options[current_index][1] if isinstance(value, bytes): - self._send_command_prefix(value) + self.app.send_command(value) elif group["kind"] == "range" and isinstance(value, int): self._send_range_flag(value) - self._apply_meter_state() def _on_toggle_clicked(self, key: str) -> None: is_on = self._toggle_vars[key].get() cmd_on, cmd_off = _TOGGLE_COMMANDS_MAP[key] - if key == "CAP": self._toggle_vars[key].set(True) if cmd_on: - self._send_command_prefix(cmd_on) + self.app.send_command(cmd_on) elif is_on and cmd_on: - self._send_command_prefix(cmd_on) + self.app.send_command(cmd_on) elif not is_on: if cmd_off: - self._send_command_prefix(cmd_off) + self.app.send_command(cmd_off) elif self._last_base_mode_flag is not None: self._send_range_flag(self._last_base_mode_flag) - self._apply_meter_state() def _send_range_flag(self, flag: int) -> None: self._last_base_mode_flag = flag - cmd_prefix = b"\xaf\x05\x03\x06\x01%c" % flag - self._send_command_prefix(cmd_prefix) + self.app.send_command(b"\xaf\x05\x03\x06\x01%c" % flag) def _get_active_range_kind(self) -> str | None: if self._last_measurement: @@ -479,7 +331,6 @@ def _get_active_range_kind(self) -> str | None: def _build_range_items(self, kind: str | None) -> list: if not kind or kind not in _RANGE_ITEMS_BY_KIND: return [("No ranges", None)] - return [ (rng, lambda f=flag: self._send_range_flag(f)) for rng, flag in _RANGE_ITEMS_BY_KIND[kind] @@ -490,316 +341,25 @@ def _show_range_menu(self) -> None: self._range_menu.destroy() self._range_menu = None return - if self._range_button is None: return btn = self._range_button active_kind = self._get_active_range_kind() - root = self.winfo_toplevel() self._range_menu = MenuDropdown( - root, + self.app, self._build_range_items(active_kind), - on_destroy=lambda: setattr(self, '_range_menu', None), + on_destroy=lambda: setattr(self, "_range_menu", None), owner_widget=btn, direction=MENU_UP, ) - def _ensure_radio_available(self, title: str) -> bool: - if NanoScanner.radio_state() != "off": - return True - - message = "Bluetooth radio is OFF. Turn Bluetooth on and try again." - self._status_var.set(message) - show_error( - self, - title, - message, - theme=(self.ui.theme.bg, self.ui.theme.outline), - ) - return False - - def connect(self) -> None: - if self._scan_in_progress: - self._scan_generation += 1 - self._scan_in_progress = False - cancel_scan, self._scan_cancel = self._scan_cancel, None - if cancel_scan is not None: - try: - cancel_scan() - except Exception: - pass - if not self._ensure_radio_available("Connect"): - return - - selection = self._device_listbox.curselection() - device = self._devices[selection[0]] if selection else None - if device is None: - show_error( - self, - "Connect", - "Select a device from the list.", - theme=(self.ui.theme.bg, self.ui.theme.outline), - ) - return - if self._worker: - if self._worker.alive: - return - self._worker = None - - self._last_trace_key = None - self._wave_view.clear() - label = device.name or device.address - self._status_var.set(f"Connecting to {label} …") - - def _ui(method): - def _dispatch(*a): - self.after(0, method, *a) - return _dispatch - - self._worker = BleWorker( - device, - on_packet=_ui(self._on_worker_packet), - on_tx=_ui(self._on_worker_tx), - on_status=_ui(self._on_worker_status), - on_error=_ui(self._on_worker_error), - on_connected=_ui(self._on_worker_connected), - on_disconnected=_ui(self._on_worker_disconnected), - ) - - def scan_devices(self) -> None: - if self._scan_in_progress: - return - if not self._ensure_radio_available("Scan"): - return - self._scan_generation += 1 - scan_id = self._scan_generation - self._scan_in_progress = True - self._scan_cancel = None - self._devices = [] - self._device_index_by_address.clear() - self._device_listbox.delete(0, tk.END) - self._status_var.set("Scanning for devices …") - _thread.start_new_thread(self._scan_worker, (scan_id,)) - - def _scan_worker(self, scan_id: int) -> None: - def on_device(device) -> None: - self.after(0, self._scan_add_device, scan_id, device) - - def register_cancel(cancel_scan) -> None: - if scan_id == self._scan_generation: - self._scan_cancel = cancel_scan - else: - cancel_scan() - - try: - devices = asyncio.run( - NanoScanner.discover( - scanning_mode="active", - on_device=on_device, - timeout=3.0, - on_cancel_register=register_cancel, - require_radio_check=False, - ) - ) - except Exception as exc: - self.after(0, self._scan_failed, scan_id, exc) - return - self.after(0, self._scan_complete, scan_id, devices or []) - - def _scan_add_device(self, scan_id: int, device) -> None: - if scan_id != self._scan_generation: - return - addr = device.address - if not addr: - return - name = device.name or "Unknown" - - existing_index = self._device_index_by_address.get(addr) - if existing_index is not None: - existing_device = self._devices[existing_index] - existing_device.name = device.name - - current_label = self._device_listbox.get(existing_index) - desired_label = f"{name} ({addr})" - # Refresh row when new packets provide a better name for an existing address. - if name != "Unknown" and current_label != desired_label: - self._device_listbox.delete(existing_index) - self._device_listbox.insert(existing_index, desired_label) - return - - self._device_index_by_address[addr] = len(self._devices) - self._devices.append(device) - self._device_listbox.insert(tk.END, f"{name} ({addr})") - - def _scan_failed(self, scan_id: int, exc: Exception) -> None: - if scan_id != self._scan_generation: - return - self._scan_in_progress = False - self._scan_cancel = None - self._status_var.set("Scan failed") - show_error( - self, - "Scan", - f"BLE scan failed: {exc!r}", - theme=(self.ui.theme.bg, self.ui.theme.outline), - ) - - def _scan_complete(self, scan_id: int, devices: list) -> None: - if scan_id != self._scan_generation: + def _copy_reading(self, _event=None) -> None: + source = _event.widget if _event is not None else self.app.focus_get() + if isinstance(source, (tk.Text, tk.Entry, ttk.Entry)): return - self._scan_in_progress = False - self._scan_cancel = None - for device in devices: - self._scan_add_device(scan_id, device) - - count = len(self._devices) - self._status_var.set(f"Found {count} device{'s' if count != 1 else ''}") - - def disconnect(self) -> None: - if self._worker: - self._worker.stop() - self._worker = None - self._status_var.set("Disconnected") - self._is_connected = False - self._rate_count = 0 - self._set_control_state(False) - self._refresh_status_bar() - - def _on_close(self) -> None: - self.disconnect() - self.destroy() - - def _on_worker_packet(self, payload: bytes) -> None: - self._apply_packet(payload) - self._refresh_status_bar() - - def _on_worker_tx(self, payload: bytes) -> None: - raw = payload.hex(" ").upper() - self._append_raw_text(f"TX {raw}\n") - self._refresh_status_bar() - - def _on_worker_status(self, message: str) -> None: - self._status_var.set(message) - self._refresh_status_bar() - - def _on_worker_connected(self, _address: str) -> None: - self._is_connected = True - self._model_var.set(MODEL.model_name) - self._rate_count = 0 - self._rate_start = time.monotonic() - self._set_control_state(True) - self._refresh_status_bar() - - def _on_worker_disconnected(self, _address: str) -> None: - self._is_connected = False - self._rate_count = 0 - self._status_var.set("Disconnected") - self._worker = None - self._set_control_state(False) - self._refresh_status_bar() - - def _on_worker_error(self, message: str) -> None: - self._status_var.set(message) - show_error( - self, "DM40", message, theme=(self.ui.theme.bg, self.ui.theme.outline) - ) - self._set_control_state(False) - self._refresh_status_bar() - - def _append_raw_text(self, text: str) -> None: - self._raw_text.configure(state="normal") - self._raw_text.insert("end", text) - self._raw_text.see("end") - self._raw_text.configure(state="disabled") - - def _apply_packet(self, data: bytes) -> None: - m = parse_measurement_for_ui(data) - - self._append_raw_text(f"RX {m.raw} CRC:{'PASS' if m.crc_ok else 'FAIL'}\n") - - if m.kind == "---": + m = self._last_measurement + if not m or m.kind == "---": return - - self._last_device_status = parse_device_status(data) - self._last_measurement = m - self._apply_meter_state() - - trace_key = data[5] - if self._last_trace_key is not None and trace_key != self._last_trace_key: - self._wave_view.clear() - self._stats_count = 0 - self._last_trace_key = trace_key - - self._mode_var.set(f"{m.kind} {m.range}" if m.range else m.kind) unit = f" {m.display_unit}" if m.display_unit else "" - self._value_label.configure(text=f" {m.value_str}{unit}") - - aux = [ - f"{val} {u}".strip() - for val, u in ((m.sec_val, m.sec_unit), (m.third_val, m.third_unit)) - if val - ] - self._aux_var.set(" ".join(aux)) - mul = UNIT_TO_BASE.get(m.display_unit, 1.0) - - if not m.overload and m.norm_value is not None: - self._wave_view.push( - m.norm_value, - pad=m.vertical_pad, - axis_unit=m.display_unit, - axis_mul=mul, - decimals=m.decimals, - ) - v = m.norm_value - if self._stats_count == 0: - self._stats_min = self._stats_max = self._stats_sum = v - else: - if v < self._stats_min: - self._stats_min = v - if v > self._stats_max: - self._stats_max = v - self._stats_sum += v - self._stats_count += 1 - avg = self._stats_sum / self._stats_count - d = m.decimals - self._stats_var.set( - "Min %.*f Max %.*f Avg %.*f%s" - % (d, self._stats_min / mul, d, self._stats_max / mul, d, avg / mul, unit) - ) - - if self._is_connected: - self._rate_count += 1 - - def _refresh_status_bar(self) -> None: - now = time.monotonic() - elapsed = int(now - self._start_time) - h = elapsed // 3600 - m = (elapsed % 3600) // 60 - s = elapsed % 60 - run_s = "RUN %02d:%02d:%02d" % (h, m, s) - self._runtime_var.set(run_s) - - status = self._last_device_status - self._icons_var.set(("⚡" if status[1] else "") + ("🔒" if status[2] else "") + ("✋" if status[3] else "")) - - charging = status[1] - target_fg = "#00ff00" if charging else self.option_get("foreground", ".") - if self._battery_label.cget("foreground") != target_fg: - self._battery_label.configure(foreground=target_fg) - - segments = max(0, min(5, int(status[0]))) - self._battery_label.configure(text="█" * segments + "░" * (5 - segments)) - if self._is_connected: - dt = now - self._rate_start - title_rate = self._rate_count / dt if dt > 0 else 0.0 - if dt >= 2.0: - self._rate_count = 0 - self._rate_start = now - title = f"{self._title_base} - {title_rate:.1f} samples/s" - else: - title = self._title_base - if self._wave_view.paused: - title += " \u23F8 PAUSED" - if self._wave_view.recording: - title += " \u23FA REC" - self.title(title) \ No newline at end of file + self.app.clipboard_clear() + self.app.clipboard_append(f"{m.value_str}{unit}") diff --git a/dm40/ble_worker.py b/dm40/ble_worker.py deleted file mode 100644 index dc206f6..0000000 --- a/dm40/ble_worker.py +++ /dev/null @@ -1,187 +0,0 @@ -"""BLE transport worker for DM40.""" - -import _thread -import time - -from . import mini_asyncio as asyncio - -from .nanowinbt.client import NanoClient - -from .parsing import MODEL, MODEL_TABLE -from .protocol_constants import ( - CMD_ID, - CMD_READ, - NOTIFY_UUID, - SERVICE_UUID, - WRITE_UUID, -) - - -class BleWorker: - __slots__ = ( - "device", "_on_packet", "_on_tx", "_on_status", "_on_error", - "_on_connected", "_on_disconnected", "_stopping", "alive", - "_pending_cmd", "_loop", "_io_event", - ) - MODEL_PREFIX = b"\xdf\x05\x03\x08\x14" - - def __init__( - self, - device, - *, - on_packet=None, - on_tx=None, - on_status=None, - on_error=None, - on_connected=None, - on_disconnected=None, - ): - self.device = device - self._on_packet = on_packet - self._on_tx = on_tx - self._on_status = on_status - self._on_error = on_error - self._on_connected = on_connected - self._on_disconnected = on_disconnected - self._stopping = False - self.alive = False - self._pending_cmd: bytes | None = None - self._loop: asyncio.AbstractEventLoop | None = None - self._io_event: asyncio.Event | None = None - - self.alive = True - try: - _thread.start_new_thread(self.run, ()) - except Exception: - self.alive = False - raise - - def stop(self) -> None: - self._stopping = True - self._wake() - - def _wake(self) -> None: - loop = self._loop - io_event = self._io_event - if loop is not None and io_event is not None: - loop.call_soon_threadsafe(io_event.set) - - def set_command(self, payload: bytes) -> None: - self._pending_cmd = payload - self._wake() - - def run(self) -> None: - loop = asyncio.new_event_loop() - self._loop = loop - try: - loop.run_until_complete(self._main()) - except Exception as exc: - self._emit(self._on_error, f"Worker failed: {exc!r}") - finally: - self._loop = None - self._io_event = None - loop.close() - self.alive = False - - @staticmethod - def _emit(callback, *args) -> None: - if callback is not None: - callback(*args) - - async def _main(self) -> None: - device = self.device - address = device.address - - last_rx = 0.0 - no_data_emitted = False - disconnected = False - disconnect_notified = False - read_ready = True - io_event = asyncio.Event() - self._io_event = io_event - - def _notify_disconnect() -> None: - nonlocal disconnect_notified - if disconnect_notified: - return - disconnect_notified = True - self._emit(self._on_status, f"disconnected — {address}") - self._emit(self._on_disconnected, address) - - def on_notify(data: bytes) -> None: - nonlocal last_rx, no_data_emitted, read_ready - last_rx = time.monotonic() - no_data_emitted = False - read_ready = True - io_event.set() - - if data.startswith(self.MODEL_PREFIX): - idx = data[9] - ord("A") - if 0 <= idx < len(MODEL_TABLE): - MODEL.model_name, MODEL.device_counts = MODEL_TABLE[idx] - return - - self._emit(self._on_packet, data) - - def on_disconnect() -> None: - nonlocal disconnected - disconnected = True - self._wake() - _notify_disconnect() - - self._emit(self._on_status, f"connecting — {address}") - async with NanoClient(device, disconnected_callback=on_disconnect) as client: - await client.prime_gatt(SERVICE_UUID, [NOTIFY_UUID, WRITE_UUID]) - await client.start_notify( - NOTIFY_UUID, - on_notify, - ) - - try: - await client.write_gatt_char(WRITE_UUID, CMD_ID) - except Exception as exc: - self._emit(self._on_error, f"ID request failed: {exc!r}") - return - self._emit(self._on_status, f"connected — {address}") - self._emit(self._on_connected, address) - self._emit(self._on_tx, CMD_ID) - io_event.set() - - try: - while not self._stopping: - if disconnected: - _notify_disconnect() - break - - await io_event.wait() - io_event.clear() - if self._stopping or disconnected: - break - - if read_ready: - cmd = self._pending_cmd - self._pending_cmd = None - payload = cmd if cmd is not None else CMD_READ - - try: - await client.write_gatt_char(WRITE_UUID, payload) - except Exception as exc: - self._emit(self._on_error, f"{'Write' if cmd else 'Read'} failed: {exc!r}") - return - - if cmd is not None: - self._emit(self._on_tx, payload) - read_ready = False - - if last_rx and not no_data_emitted and (time.monotonic() - last_rx) > 2.0: - self._emit(self._on_status, f"connected — {address} — (no data)") - no_data_emitted = True - - finally: - try: - await client.stop_notify( - NOTIFY_UUID, - ) - except Exception: - pass - _notify_disconnect() diff --git a/dm40/parsing.py b/dm40/parsing.py index 36afa8d..b80e0e3 100644 --- a/dm40/parsing.py +++ b/dm40/parsing.py @@ -51,37 +51,21 @@ class Measurement: "crc_ok", ) - def __init__( - self, - raw="", - kind="---", - range=None, - display_unit="", - value_str="---", - norm_value=None, - vertical_pad=0.0, - decimals=2, - sec_val=None, - sec_unit="", - third_val=None, - third_unit="", - overload=False, - crc_ok=False, - ): - self.raw = raw - self.kind = kind - self.range = range - self.display_unit = display_unit - self.value_str = value_str - self.norm_value = norm_value - self.vertical_pad = vertical_pad - self.decimals = decimals - self.sec_val = sec_val - self.sec_unit = sec_unit - self.third_val = third_val - self.third_unit = third_unit - self.overload = overload - self.crc_ok = crc_ok + def __init__(self): + self.raw = "" + self.kind = "---" + self.range = "" + self.display_unit = "" + self.value_str = "---" + self.norm_value = None # type: ignore[assignment] + self.vertical_pad = 0.0 + self.decimals = 2 + self.sec_val = "" + self.sec_unit = "" + self.third_val = "" + self.third_unit = "" + self.overload = False + self.crc_ok = False MODEL_TABLE = (("DM40A", 40000), ("DM40B", 50000), ("DM40C", 60000)) @@ -89,24 +73,29 @@ def __init__( def resolve_slot_scale(slot: str, kind: str, sign_flag: int): scale_flag = sign_flag & 0xFE + if slot == "FREQ": return FREQ_SCALE_MAP[scale_flag] - factor = MODEL.device_counts / 60000.0 - if kind == "CAP" and slot == "M1": - info = CAP_SCALE_MAP[scale_flag] - elif slot in ("M1", "COMB", "DC", "AC") and (kind.startswith("V") or kind == "DIODE"): - info = ALT_SCALE_MAP[scale_flag] - elif slot in ("M1", "COMB", "DC", "AC") and kind.startswith("A"): - info = AMP_SCALE_MAP[scale_flag] - elif slot == "M1" and kind in ("RES", "RES_ONLINE", "CONT"): - info = RES_SCALE_MAP[scale_flag] + + if slot in ("M1", "DC", "AC"): + if kind.startswith("V") or kind == "DIODE": + info = ALT_SCALE_MAP[scale_flag] + elif kind.startswith("A"): + info = AMP_SCALE_MAP[scale_flag] + elif kind in ("RES", "RES_ONLINE", "CONT"): + info = RES_SCALE_MAP[scale_flag] + elif kind == "CAP": + info = CAP_SCALE_MAP[scale_flag] + else: + return None elif slot == "TC" and kind == "TEMP": info = (6000.0, "°C", 1.0, 1) elif slot == "RES" and kind == "DIODE": - return 6000.0 * factor, "Ω", 1.0, 1 + info = (6000.0, "Ω", 1.0, 1) else: return None + factor = MODEL.device_counts / 60000.0 fs_base, unit, mul, dec = info return fs_base * factor, unit, mul, dec @@ -122,22 +111,27 @@ def parse_device_status(data: bytes) -> tuple: def process_slot(slot_type: str, counts: int, sign_flag: int, kind: str): - sign = -1 if (sign_flag & 0x01) else 1 + ol = counts == 0xFFFF - if not (resolved := resolve_slot_scale(slot_type, kind, sign_flag)): - if slot_type in ("DUTY", "TF", "TI"): - val = counts * 0.1 - return f"{val:.1f}", "%" if slot_type == "DUTY" else ("°F" if slot_type == "TF" else "°C") - return "", "" + if resolved := resolve_slot_scale(slot_type, kind, sign_flag): + full_scale, disp_unit, disp_mul, decimals = resolved + if ol: + return "OL", disp_unit + sign = -1 if (sign_flag & 1) else 1 + val_disp = counts * (full_scale / MODEL.device_counts) * disp_mul * sign + return "%.*f" % (decimals, val_disp), disp_unit - full_scale, disp_unit, disp_mul, decimals = resolved - scale = full_scale / MODEL.device_counts - val_disp = (counts * scale * disp_mul) * sign - return f"{val_disp:.{decimals}f}", disp_unit + if slot_type in ("DUTY", "TF", "TI"): + val_str = "OL" if ol else "%.1f" % (counts * 0.1) + if slot_type == "DUTY": + return val_str, "%" + return val_str, "°F" if slot_type == "TF" else "°C INT" + return "", "" def parse_measurement_for_ui(data: bytes) -> Measurement: - m = Measurement(raw=data.hex(" ").upper()) + m = Measurement() + m.raw = data.hex(" ").upper() m.crc_ok = ((sum(data) & 0xFF) == 0) if data else False if len(data) < 16 or not data.startswith(HEADER): return m @@ -165,15 +159,15 @@ def parse_measurement_for_ui(data: bytes) -> Measurement: if not m.overload: sign = -1 if (s0 & 0x01) else 1 - m.norm_value = sign * m1 * (fs1 / eff_counts) - m.value_str = f"{m.norm_value * mul1:.{dec1}f}" + m.norm_value = sign * m1 * (fs1 / eff_counts) # type: ignore[assignment] + m.value_str = "%.*f" % (dec1, m.norm_value * mul1) # type: ignore[operator] if not rng_name.startswith("AUTO"): m.range = f"{(fs1 * mul1):.4g}{unit1}" - if len(slots) > 1 and m2 != 0xFFFF: + if len(slots) > 1: m.sec_val, m.sec_unit = process_slot(slots[1], m2, s1, kind) - if len(slots) > 2 and m3 != 0xFFFF: + if len(slots) > 2: m.third_val, m.third_unit = process_slot(slots[2], m3, s2, kind) return m diff --git a/dm40/protocol_constants.py b/dm40/protocol_constants.py index 00ff6c1..14ecb34 100644 --- a/dm40/protocol_constants.py +++ b/dm40/protocol_constants.py @@ -1,8 +1,5 @@ """Protocol constants and command maps for DM40.""" -NOTIFY_UUID = "0000fff1-0000-1000-8000-00805f9b34fb" -WRITE_UUID = "0000fff3-0000-1000-8000-00805f9b34fb" -SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb" CMD_ID = b"\xaf\x05\x03\x08\x00\x41" CMD_READ = b"\xaf\x05\x03\x09\x00\x40" HEADER = b"\xdf\x05\x03\x09" @@ -212,7 +209,6 @@ } UNIT_TO_BASE = { - "uV": 1e-6, "mV": 1e-3, "V": 1.0, "uA": 1e-6, @@ -226,7 +222,6 @@ "nF": 1e-9, "uF": 1e-6, "mF": 1e-3, - "F": 1.0, "°C": 1.0, "°F": 1.0, "%": 1.0, @@ -235,10 +230,10 @@ MODE_SLOT_MAP = { "VDC": ("M1",), "VAC": ("M1", "DUTY", "FREQ"), - "VDC+AC": ("COMB", "DC", "AC"), + "VDC+AC": ("M1", "DC", "AC"), "ADC": ("M1",), "AAC": ("M1", "DUTY", "FREQ"), - "ADC+AC": ("COMB", "DC", "AC"), + "ADC+AC": ("M1", "DC", "AC"), "RES": ("M1",), "RES_ONLINE": ("M1",), "CAP": ("M1",), diff --git a/el15/app.py b/el15/app.py new file mode 100644 index 0000000..959e718 --- /dev/null +++ b/el15/app.py @@ -0,0 +1,216 @@ +"""EL15 device handler.""" +import tkinter as tk +from tkinter import ttk + +from shared.ble_worker import BleWorker +from GUI.themed_messagebox import show_error + +from .protocol_constants import ( + EL15Status, + HEADER, + POLL_PKT, + CMD_LOAD_OFF, + CMD_LOAD_ON, + CMD_MODE_CC, + CMD_MODE_CV, + CMD_MODE_CR, + CMD_MODE_CP, + MODE_CC, MODE_CV, MODE_CR, MODE_CP, + MODE_SETPOINT_INFO, + build_set_setpoint_cmd, + parse_status_packet, +) + + +_MODE_CYCLE = ( + ("CC", MODE_CC, CMD_MODE_CC), + ("CV", MODE_CV, CMD_MODE_CV), + ("CR", MODE_CR, CMD_MODE_CR), + ("CP", MODE_CP, CMD_MODE_CP), +) + + +def _el15_notify_filter(data: bytes) -> bool: + return data[:4] == HEADER + + +class EL15Handler: + title = "EL15" + csv_prefix = "EL15" + + def __init__(self, app) -> None: + self.app = app + self._last_status: EL15Status | None = None + self._mode_buttons: dict[int, ttk.Checkbutton] = {} + self._mode_vars: dict[int, tk.BooleanVar] = {} + self._load_var = tk.BooleanVar(value=False) + self._active_mode = MODE_CC + self._all_controls: list = [] + + def create_worker(self, device, **callbacks): + return BleWorker( + device, + poll_cmd=POLL_PKT, + notify_hook=_el15_notify_filter, + write_buf_size=10, + **callbacks, + ) + + def build_menubar_labels(self, pre: tk.Frame, post: tk.Frame) -> None: + tk.Label(pre, text="EL15", font=("Segoe UI", 11, "bold")).pack(side="left") + self._mode_label_var = tk.StringVar(value="") + tk.Label(post, textvariable=self._mode_label_var).pack( + side="left", padx=(0, 8) + ) + + def build_reading_area(self, parent: tk.Frame) -> None: + readings = tk.Frame(parent) + readings.pack(fill=tk.X) + readings.columnconfigure(0, weight=1) + readings.columnconfigure(1, weight=1) + readings.columnconfigure(2, weight=1) + + def _make_cell(col: int, header: str) -> ttk.Label: + cell = tk.Frame(readings) + cell.grid(row=0, column=col, sticky="ew", padx=(0, 12 if col < 2 else 0)) + tk.Label(cell, text=header, font=("Segoe UI", 11)).pack(anchor="w") + val_lbl = ttk.Label( + cell, text="---", font=("Cascadia Mono", 36, "bold"), + style="DM40.BigValue.TLabel", anchor="e", width=10, + ) + val_lbl.pack(fill="x") + return val_lbl + + self._volt_label = _make_cell(0, "Voltage") + self._amp_label = _make_cell(1, "Current") + self._watt_label = _make_cell(2, "Power") + + info_bar = tk.Frame(parent) + info_bar.pack(fill=tk.X, pady=(6, 0)) + + self._info_mode_var = tk.StringVar(value="Mode: ---") + self._info_load_var = tk.StringVar(value="Load: OFF") + self._info_setp_var = tk.StringVar(value="Setpoint: ---") + self._info_runtime_var = tk.StringVar(value="Runtime: --:--:--") + self._info_temp_var = tk.StringVar(value="Temp: ---") + self._info_fan_var = tk.StringVar(value="Fan: -") + + for col, var in enumerate(( + self._info_mode_var, self._info_load_var, self._info_setp_var, + self._info_runtime_var, self._info_temp_var, self._info_fan_var, + )): + tk.Label(info_bar, textvariable=var, font=("Consolas", 10)).grid( + row=0, column=col, sticky="w", padx=(0, 18) + ) + + def build_control_bar(self, bar: tk.Frame) -> None: + for label, mode_val, cmd in _MODE_CYCLE: + var = tk.BooleanVar(value=False) + self._mode_vars[mode_val] = var + btn = ttk.Checkbutton( + bar, text=label, style="MenuBar.TCheckbutton", + variable=var, + command=lambda m=mode_val, c=cmd: self._on_mode_clicked(m, c), + ) + btn.pack(side=tk.LEFT, padx=(0, 6)) + self._mode_buttons[mode_val] = btn + + self._load_btn = ttk.Checkbutton( + bar, text="Load", style="MenuBar.TCheckbutton", + variable=self._load_var, command=self._on_load_clicked, + ) + self._load_btn.pack(side=tk.LEFT, padx=(0, 18)) + + tk.Label(bar, text="Setpoint:").pack(side=tk.LEFT, padx=(0, 4)) + self._setpoint_var = tk.StringVar(value="") + self._setpoint_entry = ttk.Entry(bar, textvariable=self._setpoint_var, width=10) + self._setpoint_entry.pack(side=tk.LEFT, padx=(0, 4)) + self._setpoint_entry.bind("", self._on_set_setpoint) + self._setpoint_unit_var = tk.StringVar(value="A") + tk.Label(bar, textvariable=self._setpoint_unit_var, width=2).pack( + side=tk.LEFT, padx=(0, 6) + ) + ttk.Button(bar, text="Set", style="MenuBar.TButton", + command=self._on_set_setpoint, padding=6, width=0).pack(side=tk.LEFT) + + self._all_controls = [ + *self._mode_buttons.values(), + self._load_btn, + self._setpoint_entry, + ] + self.set_control_state(False) + + def set_control_state(self, enabled: bool) -> None: + state = "!disabled" if enabled else "disabled" + for widget in self._all_controls: + widget.state([state]) + + def pre_connect_reset(self) -> None: pass + def clear_capture(self) -> None: pass + def on_connected(self) -> None: pass + def teardown(self) -> None: pass + def refresh_status(self, now: float) -> None: pass + + def on_packet(self, data: bytes) -> None: + app = self.app + s = parse_status_packet(data) + app._append_raw_text(f"RX {s.raw} CRC:{'PASS' if s.crc_ok else 'FAIL'}\n") + + if not s.valid: + return + + self._last_status = s + self._apply_status_buttons(s) + + self._volt_label.configure(text=f"{s.voltage:8.3f} V") + self._amp_label.configure(text=f"{s.current:8.3f} A") + self._watt_label.configure(text=f"{s.power:8.3f} W") + + self._info_mode_var.set(f"Mode: {s.mode_name}") + self._info_load_var.set(f"Load: {'ON' if s.load_on else 'OFF'}") + self._info_setp_var.set( + f"{s.setpoint_label}: {s.setpoint:.{s.setpoint_decimals}f} {s.setpoint_unit}" + ) + rs = s.runtime + self._info_runtime_var.set( + "Runtime: %02d:%02d:%02d" % (rs // 3600, (rs % 3600) // 60, rs % 60) + ) + self._info_temp_var.set(f"Temp: {s.temperature:.1f}°C") + self._info_fan_var.set(f"Fan: {s.fan_speed}") + self._mode_label_var.set(f"{s.mode_name} {'▶ ON' if s.load_on else '◼ OFF'}") + + app._wave_view.push(s.voltage, pad=0.5, axis_unit="V", axis_mul=1.0, decimals=3) + + smin, smax, avg = app._push_stats(s.voltage) + app._stats_var.set("V Min %.3f Max %.3f Avg %.3f" % (smin, smax, avg)) + + if app._is_connected: + app._rate_count += 1 + + def _apply_status_buttons(self, s: EL15Status) -> None: + for mode_val, var in self._mode_vars.items(): + var.set(s.mode == mode_val) + self._active_mode = s.mode + self._load_var.set(s.load_on) + info = MODE_SETPOINT_INFO[s.mode] + self._setpoint_unit_var.set(info[0]) + + def _on_mode_clicked(self, mode_val: int, cmd_prefix: bytes) -> None: + for m, var in self._mode_vars.items(): + var.set(m == mode_val) + self._active_mode = mode_val + info = MODE_SETPOINT_INFO.get(mode_val, ("A", 3, "Current")) + self._setpoint_unit_var.set(info[0]) + self.app.send_command(cmd_prefix) + + def _on_load_clicked(self) -> None: + self.app.send_command(CMD_LOAD_ON if self._load_var.get() else CMD_LOAD_OFF) + + def _on_set_setpoint(self, _event=None) -> None: + try: + value = float(self._setpoint_var.get().strip()) + except ValueError: + show_error(self.app, "Setpoint", "Enter a valid numeric value.", + theme=(self.app.ui.theme.bg, self.app.ui.theme.outline)) + return + self.app.send_command(build_set_setpoint_cmd(value)) diff --git a/el15/protocol_constants.py b/el15/protocol_constants.py new file mode 100644 index 0000000..e0721e6 --- /dev/null +++ b/el15/protocol_constants.py @@ -0,0 +1,95 @@ +"""EL15 protocol constants and packet parsing.""" +import ctypes as _ct + +HEADER = b"\xdf\x07\x03\x08" +# Pre-computed poll packet (CMD_QUERY prefix + CRC byte 0x3F) +POLL_PKT = b"\xaf\x07\x03\x08\x00\x3f" + +CMD_LOAD_ON = b"\xaf\x07\x03\x09\x01\x04" +CMD_LOAD_OFF = b"\xaf\x07\x03\x09\x01\x00" +CMD_LOCK = b"\xaf\x07\x03\x09\x01\x01" +CMD_MODE_CC = b"\xaf\x07\x03\x03\x01\x01" +CMD_MODE_CAP = b"\xaf\x07\x03\x03\x01\x02" +CMD_MODE_CV = b"\xaf\x07\x03\x03\x01\x09" +CMD_MODE_DCR = b"\xaf\x07\x03\x03\x01\x0a" +CMD_MODE_CR = b"\xaf\x07\x03\x03\x01\x11" +CMD_MODE_CP = b"\xaf\x07\x03\x03\x01\x19" + +MODE_CC = 0x01 +MODE_CAP = 0x02 +MODE_CV = 0x09 +MODE_DCR = 0x0A +MODE_CR = 0x11 +MODE_CP = 0x19 + +MODE_NAMES = { + MODE_CC: "CC", + MODE_CAP: "CAP", + MODE_CV: "CV", + MODE_DCR: "DCR", + MODE_CR: "CR", + MODE_CP: "CP", +} + +# (unit_str, decimal_places, label) +MODE_SETPOINT_INFO = { + MODE_CC: ("A", 3, "Current"), + MODE_CAP: ("A", 3, "Current"), + MODE_CV: ("V", 3, "Voltage"), + MODE_DCR: ("A", 3, "Current"), + MODE_CR: ("Ω", 1, "Resistance"), + MODE_CP: ("W", 2, "Power"), +} + + +def build_set_setpoint_cmd(value: float) -> bytes: + """Return a 9-byte setpoint command prefix (caller appends CRC).""" + f = _ct.c_float(value) + return b"\xaf\x07\x03\x04\x04" + _ct.string_at(_ct.addressof(f), 4) + + +class EL15Status: + __slots__ = ( + "raw", "crc_ok", "valid", + "voltage", "current", "power", "runtime", "temperature", "setpoint", + "mode", "mode_name", "fan_speed", "load_on", + "setpoint_unit", "setpoint_decimals", "setpoint_label", + ) + + def __init__(self): + self.raw = "" + self.crc_ok = False + self.valid = False + self.voltage = self.current = self.power = 0.0 + self.runtime = 0 + self.temperature = self.setpoint = 0.0 + self.mode = MODE_CC + self.mode_name = "---" + self.fan_speed = 0 + self.load_on = False + self.setpoint_unit = "A" + self.setpoint_decimals = 3 + self.setpoint_label = "Current" + + +def parse_status_packet(data: bytes) -> EL15Status: + """Parse a 28-byte EL15 status notification into EL15Status.""" + s = EL15Status() + s.raw = data.hex(" ").upper() + s.crc_ok = (sum(data) & 0xFF) == 0 + if len(data) < 28 or data[:4] != HEADER: + return s + s.voltage = _ct.c_float.from_buffer_copy(data, 7).value + s.current = _ct.c_float.from_buffer_copy(data, 11).value + s.runtime = _ct.c_int32.from_buffer_copy(data, 15).value + s.temperature = _ct.c_float.from_buffer_copy(data, 19).value + s.setpoint = _ct.c_float.from_buffer_copy(data, 23).value + s.power = s.voltage * s.current + s.mode = data[5] & 0x0F + s.fan_speed = data[5] >> 4 + s.load_on = data[6] != 0 + s.mode_name = MODE_NAMES.get(s.mode, f"?{s.mode:02X}") + info = MODE_SETPOINT_INFO.get(s.mode, ("?", 3, "Setpoint")) + s.setpoint_unit, s.setpoint_decimals, s.setpoint_label = info + s.valid = True + return s diff --git a/main.py b/main.py index 66ca728..fc7bb46 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,10 @@ -if '__compiled__' in globals(): # type: ignore[name-defined] +if '__compiled__' in globals(): # type: ignore[name-defined] import shims shims.install() def main() -> None: - from dm40.app import DM40App - - app = DM40App() - app.mainloop() + from shared.base_app import App + App().mainloop() if __name__ == "__main__": main() diff --git a/shared/base_app.py b/shared/base_app.py new file mode 100644 index 0000000..e672320 --- /dev/null +++ b/shared/base_app.py @@ -0,0 +1,483 @@ +"""Single-window application.""" +import _thread +import time +import tkinter as tk +from tkinter import ttk + +from shared import mini_asyncio as asyncio +from shared.nanowinbt.scanner import NanoScanner + +from GUI.controls import UIControls +from GUI.theme_manager import ThemeManager +from GUI.themed_messagebox import show_error +from GUI.widgets.autoscrollbar import AutoScrollbar +from GUI.widgets.find_popup import FindPopup +from GUI.widgets.helpers import ensure_dpi_awareness, theme_title_bar +from GUI.widgets.menubar import OwnerDrawnMenuBar +from GUI.widgets.themed_button import ThemedButton +from GUI.widgets.waveform_view import WaveformView + +_DEVICE_TYPES = ("DM40", "EL15") + + +def _guess_device_type(name: str) -> str | None: + upper = name.upper() + for dtype in _DEVICE_TYPES: + if upper.startswith(dtype): + return dtype + return None + + +class App(tk.Tk): + + def __init__(self) -> None: + super().__init__() + ensure_dpi_awareness() + + self.title("DM40GUI") + self.minsize(916, 650) + self.wm_geometry("916x650") + + self.style = ttk.Style(self) + self._theme_manager = ThemeManager(self, self.style, self._apply_theme) + initial_theme = self._theme_manager.get_active_theme() + self.ui = UIControls(self, self.style, theme=initial_theme) + theme_title_bar(self, border_color=initial_theme.outline, caption_color=initial_theme.bg) + + self._worker = None + self._is_connected = False + self._handler = None + self._handler_type = None + + self._devices = [] + self._device_index_by_address: dict[str, int] = {} + self._scan_in_progress = False + self._scan_generation = 0 + self._scan_cancel = None + + self._reset_stats() + self._rate_count = 0 + self._rate_start = self._start_time = 0.0 + + self._build_ui() + + self.bind_all("", self._toggle_wave_pause) + self.bind_all("", self._save_wave_csv) + self.bind_all("", self._toggle_wave_record) + self.protocol("WM_DELETE_WINDOW", self._on_close) + + def _reset_stats(self) -> None: + self._stats_count = 0 + self._stats_sum = self._stats_min = self._stats_max = 0.0 + + def _push_stats(self, value: float) -> tuple[float, float, float]: + if self._stats_count == 0: + self._stats_min = self._stats_max = self._stats_sum = value + else: + if value < self._stats_min: + self._stats_min = value + if value > self._stats_max: + self._stats_max = value + self._stats_sum += value + self._stats_count += 1 + return self._stats_min, self._stats_max, self._stats_sum / self._stats_count + + def _build_ui(self) -> None: + self._menu_bar = OwnerDrawnMenuBar( + self, + menus=[ + ("File", [ + ("Save Buffer", self._save_wave_csv), + ("Record", self._toggle_wave_record), + "separator", + ("Clear", self._clear_capture_data), + "separator", + ("Exit", self._on_close), + ]), + ("Themes", []), + ], + theme_manager=self._theme_manager, + on_theme=self._apply_theme, + ) + self._menu_bar.pack(fill=tk.X) + self._menu_bar.grid_columnconfigure(2, weight=1) + + self._menubar_pre = tk.Frame(self._menu_bar) + self._menubar_pre.grid(row=0, column=3, sticky="e") + + self._runtime_var = tk.StringVar(value="") + tk.Label(self._menu_bar, textvariable=self._runtime_var).grid( + row=0, column=4, sticky="w" + ) + + self._menubar_post = tk.Frame(self._menu_bar) + self._menubar_post.grid(row=0, column=5, sticky="e") + + self._status_var = tk.StringVar(value="Disconnected") + + root = tk.Frame(self, padx=12, pady=12) + root.pack(fill=tk.BOTH, expand=True) + root.columnconfigure(0, weight=1) + + mid = tk.Frame(root) + mid.grid(row=0, column=0, sticky="nsew") + mid.columnconfigure(0, weight=1) + root.rowconfigure(0, weight=3) + + self._reading_host = tk.Frame(mid) + self._reading_host.pack(fill=tk.X) + tk.Label(self._reading_host, text="Connect to a compatible device", + font=("Segoe UI", 16), pady=40).pack() + + self._wave_view = WaveformView(mid, colors=self.ui.theme, capacity=600) + self._wave_view.pack(fill=tk.BOTH, expand=True, pady=(10, 0)) + + self._stats_var = tk.StringVar(value="") + tk.Label(mid, textvariable=self._stats_var, font=("Consolas", 10), + anchor="w").pack(fill=tk.X, pady=(0, 10)) + + raw_frame = tk.Frame(root) + raw_frame.grid(row=1, column=0, sticky="nsew") + raw_frame.columnconfigure(0, weight=1) + raw_frame.rowconfigure(1, weight=1) + root.rowconfigure(1, weight=1) + + tk.Label(raw_frame, text="Raw packets:").grid(row=0, column=0, sticky="w") + + self._raw_text = tk.Text( + raw_frame, wrap="none", height=10, relief="flat", + highlightthickness=2, undo=False, maxundo=0, autoseparators=False, + ) + self._raw_text.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) + + yscroll = AutoScrollbar( + raw_frame, orient="vertical", command=self._raw_text.yview, + style="Arrowless.Vertical.TScrollbar", + ) + yscroll.grid(row=1, column=1, sticky="ns", pady=(6, 0)) + self._raw_text.configure(yscrollcommand=yscroll.set, state="disabled") + + device_panel = tk.Frame(raw_frame) + device_panel.grid(row=0, column=2, rowspan=2, sticky="nsw", padx=(12, 0)) + device_panel.rowconfigure(1, weight=1) + device_panel.columnconfigure(0, weight=0) + device_panel.columnconfigure(1, weight=1) + device_panel.columnconfigure(2, weight=0) + + tk.Label(device_panel, text="Devices:").grid(row=0, column=0, sticky="w") + tk.Label(device_panel, textvariable=self._status_var).grid( + row=0, column=1, columnspan=2, sticky="e" + ) + + self._device_listbox = tk.Listbox( + device_panel, height=10, width=58, exportselection=False, + activestyle="none", highlightthickness=2, relief="flat", + ) + + def _on_click(event): + index = self._device_listbox.nearest(event.y) + bbox = self._device_listbox.bbox(index) + if bbox is None or event.y > bbox[1] + bbox[3]: + return "break" + + self._device_listbox.bind("", _on_click) + self._device_listbox.grid(row=1, column=0, columnspan=3, sticky="nsew", pady=(6, 6)) + + ThemedButton(device_panel, text="Scan", command=self.scan_devices).grid( + row=2, column=0, sticky="w" + ) + ThemedButton(device_panel, text="Connect", command=self.connect).grid( + row=2, column=1, sticky="e", padx=(0, 6) + ) + ThemedButton(device_panel, text="Disconnect", command=self.disconnect).grid( + row=2, column=2, sticky="e" + ) + + self._find = FindPopup( + raw_frame, self._raw_text, self.ui.theme, + grid_opts={"row": 1, "column": 0, "sticky": "ne", "padx": 6, "pady": (10, 0)}, + ) + + self._control_bar = tk.Frame(root) + self._control_bar.grid(row=2, column=0, sticky="w", pady=(8, 0)) + root.rowconfigure(2, weight=0) + + def _setup_handler(self, dtype: str) -> None: + if dtype == self._handler_type: + return + if self._handler: + self._handler.teardown() + for frame in (self._reading_host, self._menubar_pre, self._menubar_post, self._control_bar): + for w in frame.winfo_children(): + w.destroy() + self._handler_type = dtype + if dtype == "DM40": + from dm40.app import DM40Handler + self._handler = DM40Handler(self) + else: + from el15.app import EL15Handler + self._handler = EL15Handler(self) + self._handler.build_reading_area(self._reading_host) + self._handler.build_menubar_labels(self._menubar_pre, self._menubar_post) + self._handler.build_control_bar(self._control_bar) + + def _apply_theme(self, theme) -> None: + self.ui.use_theme(theme) + theme_title_bar(self, border_color=theme.outline, caption_color=theme.bg) + self._wave_view.set_colors(self.ui.theme) + self._find.set_tag_colors(self.ui.theme) + + _CSV_DIR = __compiled__.containing_dir if '__compiled__' in globals() else __file__.rsplit('\\', 2)[0] # type: ignore[name-defined] + + def _csv_path(self, prefix: str) -> str: + return f"{self._CSV_DIR}\\{prefix}_{time.strftime('%Y%m%d_%H%M%S')}.csv" + + def _toggle_wave_pause(self, _event=None) -> None: + self._wave_view.toggle_pause() + self._refresh_status_bar() + + def _save_wave_csv(self, _event=None) -> None: + prefix = self._handler.csv_prefix if self._handler else "capture" + self._wave_view.save_buffer_csv(self._csv_path(prefix)) + + def _toggle_wave_record(self, _event=None) -> None: + prefix = self._handler.csv_prefix if self._handler else "capture" + if self._wave_view.recording: + self._wave_view.stop_recording() + else: + self._wave_view.toggle_recording(self._csv_path(f"{prefix}_rec")) + self._refresh_status_bar() + + def _clear_capture_data(self) -> None: + self._wave_view.clear() + self._raw_text.configure(state="normal") + self._raw_text.delete("1.0", "end") + self._raw_text.configure(state="disabled") + self._reset_stats() + self._stats_var.set("") + if self._handler: + self._handler.clear_capture() + self._refresh_status_bar() + + def send_command(self, cmd_prefix: bytes) -> None: + if not self._worker or not self._worker.alive: + show_error(self, "Command", "Connect to a device before sending commands.", + theme=(self.ui.theme.bg, self.ui.theme.outline)) + return + self._worker.set_command(b"%b%c" % (cmd_prefix, (-sum(cmd_prefix)) & 0xFF)) + + def _radio_off(self, title: str) -> bool: + if NanoScanner.radio_state() != "off": + return False + msg = "Bluetooth radio is OFF. Turn Bluetooth on and try again." + self._status_var.set(msg) + show_error(self, title, msg, theme=(self.ui.theme.bg, self.ui.theme.outline)) + return True + + def scan_devices(self) -> None: + if self._scan_in_progress or self._radio_off("Scan"): + return + self._scan_generation += 1 + scan_id = self._scan_generation + self._scan_in_progress = True + self._scan_cancel = None + self._devices = [] + self._device_index_by_address.clear() + self._device_listbox.delete(0, tk.END) + self._status_var.set("Scanning …") + _thread.start_new_thread(self._scan_worker, (scan_id,)) + + def _scan_worker(self, scan_id: int) -> None: + def on_device(device) -> None: + self.after(0, self._scan_add_device, scan_id, device) + + def register_cancel(cancel_scan) -> None: + if scan_id == self._scan_generation: + self._scan_cancel = cancel_scan + else: + cancel_scan() + + try: + devices = asyncio.run( + NanoScanner.discover( + scanning_mode="active", + on_device=on_device, + timeout=3.0, + on_cancel_register=register_cancel, + require_radio_check=False, + ) + ) + except Exception as exc: + self.after(0, self._scan_done, scan_id, [], exc) + return + self.after(0, self._scan_done, scan_id, devices or [], None) + + def _scan_add_device(self, scan_id: int, device) -> None: + if scan_id != self._scan_generation or not device.address: + return + addr, name = device.address, device.name or "Unknown" + idx = self._device_index_by_address.get(addr) + if idx is not None: + self._devices[idx].name = device.name + label = f"{name} ({addr})" + if name != "Unknown" and self._device_listbox.get(idx) != label: + self._device_listbox.delete(idx) + self._device_listbox.insert(idx, label) + return + self._device_index_by_address[addr] = len(self._devices) + self._devices.append(device) + self._device_listbox.insert(tk.END, f"{name} ({addr})") + + def _scan_done(self, scan_id: int, devices: list, exc) -> None: + if scan_id != self._scan_generation: + return + self._scan_in_progress = False + self._scan_cancel = None + if exc: + self._status_var.set("Scan failed") + show_error(self, "Scan", f"BLE scan failed: {exc!r}", + theme=(self.ui.theme.bg, self.ui.theme.outline)) + return + for d in devices: + self._scan_add_device(scan_id, d) + n = len(self._devices) + self._status_var.set(f"Found {n} device{'s' if n != 1 else ''}") + + def connect(self) -> None: + if self._scan_in_progress: + self._scan_generation += 1 + self._scan_in_progress = False + cancel, self._scan_cancel = self._scan_cancel, None + if cancel: + try: cancel() + except Exception: pass + if self._radio_off("Connect"): + return + sel = self._device_listbox.curselection() + device = self._devices[sel[0]] if sel else None + if not device: + show_error(self, "Connect", "Select a device from the list.", + theme=(self.ui.theme.bg, self.ui.theme.outline)) + return + if self._worker and self._worker.alive: + return + self._worker = None + + dtype = _guess_device_type(device.name or "") + if dtype: + self._start_connection(device, dtype) + else: + self._status_var.set("Identifying device …") + _thread.start_new_thread(self._probe_and_connect, (device,)) + + def _probe_and_connect(self, device) -> None: + from shared.ble_worker import probe_device_type + dtype = probe_device_type(device) + def _finish(): + if self._worker and self._worker.alive: + return + if dtype: + self._start_connection(device, dtype) + else: + self._status_var.set("Unrecognized device") + show_error(self, "Connect", + f"'{device.name or device.address}' is not a recognized device.", + theme=(self.ui.theme.bg, self.ui.theme.outline)) + self.after(0, _finish) + + def _start_connection(self, device, dtype: str) -> None: + self._setup_handler(dtype) + handler = self._handler + assert handler is not None + handler.pre_connect_reset() + self._wave_view.clear() + self._reset_stats() + self._stats_var.set("") + self._status_var.set(f"Connecting to {device.name or device.address} …") + + self._alive = True + _alive = self.__dict__ + def _ui(method): + def _dispatch(*a): + self.after(0, _guarded, method, *a) + return _dispatch + def _guarded(method, *a): + if '_alive' in _alive: + method(*a) + self._refresh_status_bar() + + self._worker = handler.create_worker( + device, + on_packet=_ui(handler.on_packet), + on_tx=_ui(lambda p: self._append_raw_text(f"TX {p.hex(' ').upper()}\n")), + on_status=_ui(self._status_var.set), + on_error=_ui(self._on_worker_error), + on_connected=_ui(self._on_worker_connected), + on_disconnected=_ui(self._on_worker_disconnected), + ) + + def disconnect(self) -> None: + self.__dict__.pop('_alive', None) + if self._worker: + self._worker.stop() + self._worker = None + self._status_var.set("Disconnected") + self._is_connected = False + self._rate_count = 0 + if self._handler: + self._handler.set_control_state(False) + self._refresh_status_bar() + + def _on_close(self) -> None: + self.disconnect() + self.destroy() + + def _on_worker_connected(self, _address: str) -> None: + self._is_connected = True + self._rate_count = 0 + self._start_time = self._rate_start = time.monotonic() + self._handler.set_control_state(True) # type: ignore[union-attr] + self._handler.on_connected() # type: ignore[union-attr] + + def _on_worker_disconnected(self, _address: str) -> None: + self._is_connected = False + self._rate_count = 0 + self._status_var.set("Disconnected") + self._worker = None + self._handler.set_control_state(False) # type: ignore[union-attr] + + def _on_worker_error(self, message: str) -> None: + self._status_var.set(message) + show_error(self, self._handler.title, message, theme=(self.ui.theme.bg, self.ui.theme.outline)) # type: ignore[union-attr] + self._handler.set_control_state(False) # type: ignore[union-attr] + + def _append_raw_text(self, text: str) -> None: + self._raw_text.configure(state="normal") + self._raw_text.insert("end", text) + self._raw_text.see("end") + self._raw_text.configure(state="disabled") + + def _refresh_status_bar(self) -> None: + now = time.monotonic() + title_base = self._handler.title if self._handler_type else "DM40GUI" # type: ignore[union-attr] + if self._is_connected: + elapsed = int(now - self._start_time) + self._runtime_var.set( + "RUN %02d:%02d:%02d" % (elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60) + ) + dt = now - self._rate_start + rate = self._rate_count / dt if dt > 0 else 0.0 + if dt >= 2.0: + self._rate_count = 0 + self._rate_start = now + title = f"{title_base} - {rate:.1f} samples/s" + else: + self._runtime_var.set("") + title = title_base + if self._wave_view.paused: + title += " \u23F8 PAUSED" + if self._wave_view.recording: + title += " \u23FA REC" + self.title(title) + if self._handler_type: + self._handler.refresh_status(now) # type: ignore[union-attr] diff --git a/shared/ble_worker.py b/shared/ble_worker.py new file mode 100644 index 0000000..42c6390 --- /dev/null +++ b/shared/ble_worker.py @@ -0,0 +1,237 @@ +"""BLE transport worker.""" + +import _thread +import time + +from shared import mini_asyncio as asyncio +from shared.nanowinbt.client import NanoClient + +_SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb" +_NOTIFY_UUID = "0000fff1-0000-1000-8000-00805f9b34fb" +_WRITE_UUID = "0000fff3-0000-1000-8000-00805f9b34fb" + +_CMD_DISCOVERY = b"\xaf\xff\xff\x00\x00\x53" +_DISCOVERY_HEADER = b"\xdf\xff\xff\x00" +_FAMILY_MAP = {b"\x05\x03": "DM40", b"\x07\x03": "EL15"} + + +def probe_device_type(device, timeout: float = 6.0) -> str | None: + """Connect, send discovery command, return 'DM40'/'EL15' or None. + Blocking — call from a background thread.""" + result = [None] + try: + asyncio.run(_probe(device, timeout, result)) + except Exception: + pass + return result[0] + + +async def _probe(device, timeout, result): + responses = [] + loop = asyncio.get_running_loop() + got = asyncio.Event() + + def on_notify(data: bytes): + responses.append(data) + loop.call_soon_threadsafe(got.set) + + async with NanoClient(device, timeout=timeout) as client: + await client.prime_gatt(_SERVICE_UUID, [_NOTIFY_UUID, _WRITE_UUID], 8) + await client.start_notify(_NOTIFY_UUID, on_notify) + await client.write_gatt_char(_WRITE_UUID, _CMD_DISCOVERY) + + deadline_handle = loop.call_later(3.0, got.set) + await got.wait() + deadline_handle.cancel() + + for r in responses: + if r.startswith(_DISCOVERY_HEADER) and len(r) >= 7: + family = _FAMILY_MAP.get(r[5:7]) + if family: + result[0] = family + break + + try: + await client.stop_notify(_NOTIFY_UUID) + except Exception: + pass + + +class BleWorker: + __slots__ = ( + "device", "_on_packet", "_on_tx", "_on_status", "_on_error", + "_on_connected", "_on_disconnected", "_stopping", "alive", + "_pending_cmd", "_loop", "_io_event", + "_poll_cmd", "_init_cmd", "_notify_hook", "_write_buf_size", + ) + + def __init__( + self, + device, + *, + poll_cmd: bytes, + init_cmd: bytes | None = None, + notify_hook=None, + write_buf_size: int = 20, + on_packet=None, + on_tx=None, + on_status=None, + on_error=None, + on_connected=None, + on_disconnected=None, + ): + self.device = device + self._poll_cmd = poll_cmd + self._init_cmd = init_cmd + self._notify_hook = notify_hook + self._write_buf_size = write_buf_size + self._on_packet = on_packet + self._on_tx = on_tx + self._on_status = on_status + self._on_error = on_error + self._on_connected = on_connected + self._on_disconnected = on_disconnected + self._stopping = False + self.alive = True + self._pending_cmd: bytes | None = None + self._loop = None + self._io_event = None + try: + _thread.start_new_thread(self.run, ()) + except Exception: + self.alive = False + raise + + def stop(self) -> None: + self._stopping = True + self._wake() + + def _wake(self) -> None: + loop = self._loop + io_event = self._io_event + if loop is not None and io_event is not None: + loop.call_soon_threadsafe(io_event.set) + + def set_command(self, payload: bytes) -> None: + self._pending_cmd = payload + self._wake() + + def run(self) -> None: + loop = asyncio.new_event_loop() + self._loop = loop + try: + loop.run_until_complete(self._main()) + except Exception as exc: + on_error = self._on_error + if on_error: + on_error(f"Worker failed: {exc!r}") + finally: + self._loop = None + self._io_event = None + loop.close() + self.alive = False + + async def _main(self) -> None: + device = self.device + address = device.address + on_packet = self._on_packet + on_tx = self._on_tx + on_status = self._on_status + on_error = self._on_error + on_connected = self._on_connected + on_disconn = self._on_disconnected + poll_cmd = self._poll_cmd + init_cmd = self._init_cmd + notify_hook = self._notify_hook + + last_rx = 0.0 + no_data_emitted = False + disconnected = False + disconnect_notified = False + read_ready = True + io_event = asyncio.Event() + self._io_event = io_event + + def _notify_disconnect() -> None: + nonlocal disconnect_notified + if disconnect_notified: + return + disconnect_notified = True + if on_status: + on_status(f"disconnected — {address}") + if on_disconn: + on_disconn(address) + + def on_notify(data: bytes) -> None: + nonlocal last_rx, no_data_emitted, read_ready + last_rx = time.monotonic() + no_data_emitted = False + read_ready = True + io_event.set() + if (notify_hook is None or notify_hook(data)) and on_packet: + on_packet(data) + + def on_disconnect() -> None: + nonlocal disconnected + disconnected = True + self._wake() + _notify_disconnect() + + if on_status: + on_status(f"connecting — {address}") + async with NanoClient(device, disconnected_callback=on_disconnect) as client: + await client.prime_gatt(_SERVICE_UUID, [_NOTIFY_UUID, _WRITE_UUID], self._write_buf_size) + await client.start_notify(_NOTIFY_UUID, on_notify) + + if init_cmd is not None: + try: + await client.write_gatt_char(_WRITE_UUID, init_cmd) + except Exception as exc: + if on_error: + on_error(f"Init failed: {exc!r}") + return + if on_tx: + on_tx(init_cmd) + + if on_status: + on_status(f"connected — {address}") + if on_connected: + on_connected(address) + io_event.set() + + try: + while not self._stopping: + if disconnected: + _notify_disconnect() + break + + await io_event.wait() + io_event.clear() + if self._stopping or disconnected: + break + + if read_ready: + cmd = self._pending_cmd + self._pending_cmd = None + payload = poll_cmd if cmd is None else cmd + try: + await client.write_gatt_char(_WRITE_UUID, payload) + except Exception as exc: + if on_error: + on_error(f"{'Write' if cmd else 'Poll'} failed: {exc!r}") + return + if cmd is not None and on_tx: + on_tx(payload) + read_ready = False + + if last_rx and not no_data_emitted and (time.monotonic() - last_rx) > 2.0: + if on_status: + on_status(f"connected — {address} — (no data)") + no_data_emitted = True + + finally: + try: + await client.stop_notify(_NOTIFY_UUID) + except Exception: + pass + _notify_disconnect() diff --git a/dm40/mini_asyncio.py b/shared/mini_asyncio.py similarity index 95% rename from dm40/mini_asyncio.py rename to shared/mini_asyncio.py index 711f26d..aab2929 100644 --- a/dm40/mini_asyncio.py +++ b/shared/mini_asyncio.py @@ -5,6 +5,7 @@ _tls = _thread._local() +_tls.running_loop = None class TimerHandle: @@ -194,13 +195,10 @@ def _run_once(self) -> None: callback(*args) return - sleep_for = 0.001 if self._timers: - sleep_for = self._timers[0][0] - now - if sleep_for < 0.0: - sleep_for = 0.0 - elif sleep_for > 0.01: - sleep_for = 0.01 + sleep_for = min(max(self._timers[0][0] - now, 0.0), 0.01) + else: + sleep_for = 0.001 time.sleep(sleep_for) @@ -209,7 +207,7 @@ def new_event_loop(): def get_running_loop(): - loop = getattr(_tls, "running_loop", None) + loop = _tls.running_loop if loop is None: raise RuntimeError("no running event loop") return loop diff --git a/dm40/nanowinbt/client.py b/shared/nanowinbt/client.py similarity index 75% rename from dm40/nanowinbt/client.py rename to shared/nanowinbt/client.py index 7b5f0ee..124f468 100644 --- a/dm40/nanowinbt/client.py +++ b/shared/nanowinbt/client.py @@ -4,14 +4,13 @@ from . import ctypes_winrt as w from .ctypes_com import RoSession from .scanner import _await_ptr -from dm40.types import NanoBLEDevice class NanoClientError(RuntimeError): pass class NanoClient: - def __init__(self, connect_target: NanoBLEDevice, disconnected_callback=None, *, timeout: float = 30.0): + def __init__(self, connect_target, disconnected_callback=None, *, timeout: float = 30.0): self._target = connect_target self._disconnected_callback = disconnected_callback self._timeout = timeout @@ -23,8 +22,8 @@ def __init__(self, connect_target: NanoBLEDevice, disconnected_callback=None, *, self._status_token = None self._status_delegate = None self._write_buffer: w.ComPtr | None = None + self._write_buffer_ptr = None self._write_buffer_data_ptr = None - self._write_buffer_capacity = 0 async def __aenter__(self): await self.connect() @@ -64,7 +63,7 @@ async def connect(self) -> None: statics.release() if not device_ptr: - raise NanoClientError(f"Device not found for address: {bt_addr:#x}") + raise NanoClientError("Device not found for address: %#x" % bt_addr) self._device = w.ComPtr(device_ptr) w.btle_device6_request_throughput_params(device_ptr) self._register_disconnect_handler() @@ -73,8 +72,7 @@ async def disconnect(self) -> None: self._unregister_disconnect_handler() self._release_all() - async def prime_gatt(self, service_uuid: str, char_uuids: list[str]) -> None: - """Discover service and resolve all characteristics in one async call.""" + async def prime_gatt(self, service_uuid: str, char_uuids: list[str], write_buffer_size: int) -> None: device = self._device if device is None: raise NanoClientError("Not connected") @@ -87,22 +85,47 @@ async def prime_gatt(self, service_uuid: str, char_uuids: list[str]) -> None: ptrs = [] try: device3 = device.query_interface(w.IID_BLUETOOTH_LE_DEVICE3); ptrs.append(device3) - services_result_ptr = await _await_ptr( - w.btle_device3_get_gatt_services_for_uuid_async(device3.ptr, target_service), - w.IID_ASYNC_COMPLETED_HANDLER_GATT_DEVICE_SERVICES_RESULT, - 3.5, - "client.prime.services", - ) - services_result = w.ComPtr(services_result_ptr); ptrs.append(services_result) - if w.gatt_services_result_get_status(services_result.ptr) != 0: - raise NanoClientError("GATT service query failed") + for attempt in range(3): + if attempt: + _delay = asyncio.Event() + asyncio.get_running_loop().call_later(0.8, _delay.set) + await _delay.wait() + services_result_ptr = await _await_ptr( + w.btle_device3_get_gatt_services_for_uuid_async(device3.ptr, target_service), + w.IID_ASYNC_COMPLETED_HANDLER_GATT_DEVICE_SERVICES_RESULT, + 5.0, + "client.prime.services", + ) + services_result = w.ComPtr(services_result_ptr); ptrs.append(services_result) + svc_status = w.gatt_services_result_get_status(services_result.ptr) + if svc_status == 0: + break + services_result.release() + ptrs.pop() + else: + _STATUS_NAMES = {0: "Success", 1: "Unreachable", 2: "ProtocolError", 3: "AccessDenied"} + conn_status = w.btle_device_get_connection_status(device.ptr) + msg = ( + f"GATT service query failed after {attempt + 1} attempts\n" + f" status={svc_status} ({_STATUS_NAMES.get(svc_status, 'Unknown')})\n" + f" service_uuid={service_uuid}\n" + f" target_service_guid={bytes(target_service).hex()}\n" + f" device_ptr={device.ptr}\n" + f" device3_ptr={device3.ptr}\n" + f" connection_status={conn_status} ({'Connected' if conn_status == 1 else 'Disconnected'})\n" + f" bt_address={self._target.bluetooth_address:#x}\n" + f" char_uuids={char_uuids}\n" + f" missing={missing}\n" + f" write_buffer_size={write_buffer_size}" + ) + print(msg) + raise NanoClientError(msg) services_view = w.ComPtr(w.gatt_services_result_get_services(services_result.ptr)); ptrs.append(services_view) if w.vector_view_get_size(services_view.ptr) == 0: raise NanoClientError(f"Service not found: {service_uuid}") service = w.ComPtr(w.vector_view_get_at(services_view.ptr, 0)); ptrs.append(service) service3 = service.query_interface(w.IID_GATT_DEVICE_SERVICE3); ptrs.append(service3) - # Single call to get ALL characteristics, then match by UUID in Python chars_result_ptr = await _await_ptr( w.gatt_service3_get_characteristics_async(service3.ptr), w.IID_ASYNC_COMPLETED_HANDLER_GATT_CHARACTERISTICS_RESULT, @@ -114,7 +137,7 @@ async def prime_gatt(self, service_uuid: str, char_uuids: list[str]) -> None: raise NanoClientError("Characteristic query failed") chars_view = w.ComPtr(w.gatt_characteristics_result_get_characteristics(chars_result.ptr)); ptrs.append(chars_view) count = w.vector_view_get_size(chars_view.ptr) - targets = {bytes(w._guid(u)): u for u in missing} + targets = {w._bguid(u): u for u in missing} for i in range(count): cp = w.vector_view_get_at(chars_view.ptr, i) matched = targets.pop(bytes(w.gatt_characteristic_get_uuid(cp)), None) @@ -128,19 +151,18 @@ async def prime_gatt(self, service_uuid: str, char_uuids: list[str]) -> None: for p in reversed(ptrs): p.release() - # Pre-create write buffer (avoids RoGetActivationFactory on first write) - self._get_or_grow_write_buffer(20) + self._write_buffer = w.ComPtr(w.buffer_factory_create(write_buffer_size)) + self._write_buffer_ptr = self._write_buffer.ptr + self._write_buffer_data_ptr = w.buffer_get_data_ptr(self._write_buffer_ptr) async def write_gatt_char(self, char_uuid: str, data) -> None: char = self._require_char(char_uuid) - payload = data if isinstance(data, bytes) else bytes(data) - buf = self._get_or_grow_write_buffer(len(payload)) - ctypes.memmove(self._write_buffer_data_ptr, payload, len(payload)) # type: ignore[arg-type] - w.buffer_set_length(buf.ptr, len(payload)) + ctypes.memmove(self._write_buffer_data_ptr, data, len(data)) # type: ignore[arg-type] + w.buffer_set_length(self._write_buffer_ptr, len(data)) await _await_ptr( w.gatt_characteristic_write_value_with_option_async( char.ptr, - buf.ptr, + self._write_buffer_ptr, w.GATT_WRITE_OPTION_WITHOUT_RESPONSE, ), w.IID_ASYNC_COMPLETED_HANDLER_GATT_COMM_STATUS, @@ -149,21 +171,6 @@ async def write_gatt_char(self, char_uuid: str, data) -> None: get_results=False, ) - def _get_or_grow_write_buffer(self, needed: int) -> w.ComPtr: - current = self._write_buffer - if current is not None and self._write_buffer_capacity >= needed: - return current - if current is not None: - try: - current.release() - except Exception: - pass - capacity = needed if needed > 0 else 1 - self._write_buffer = w.ComPtr(w.buffer_factory_create(capacity)) - self._write_buffer_capacity = capacity - self._write_buffer_data_ptr = w.buffer_get_data_ptr(self._write_buffer.ptr) - return self._write_buffer - async def start_notify(self, char_uuid: str, callback) -> None: char = self._require_char(char_uuid) if char_uuid in self._notify_tokens: @@ -204,17 +211,16 @@ async def stop_notify(self, char_uuid: str) -> None: if token_entry is None: return - char = token_entry[0] await _await_ptr( - w.gatt_characteristic_write_cccd_async(char.ptr, w.GATT_CCCD_NONE), + w.gatt_characteristic_write_cccd_async(token_entry[0].ptr, w.GATT_CCCD_NONE), w.IID_ASYNC_COMPLETED_HANDLER_GATT_COMM_STATUS, self._timeout, "client.notify.stop", get_results=False, ) - self._notify_tokens.pop(char_uuid, None) - w.gatt_characteristic_remove_value_changed(char.ptr, token_entry[1]) + del self._notify_tokens[char_uuid] + w.gatt_characteristic_remove_value_changed(token_entry[0].ptr, token_entry[1]) def _register_disconnect_handler(self) -> None: if self._device is None: @@ -248,8 +254,8 @@ def _release_all(self) -> None: try: self._write_buffer.release() except Exception: pass self._write_buffer = None + self._write_buffer_ptr = None self._write_buffer_data_ptr = None - self._write_buffer_capacity = 0 for char, token, _delegate in self._notify_tokens.values(): try: w.gatt_characteristic_remove_value_changed(char.ptr, token) diff --git a/dm40/nanowinbt/ctypes_com.py b/shared/nanowinbt/ctypes_com.py similarity index 100% rename from dm40/nanowinbt/ctypes_com.py rename to shared/nanowinbt/ctypes_com.py diff --git a/dm40/nanowinbt/ctypes_winrt.py b/shared/nanowinbt/ctypes_winrt.py similarity index 94% rename from dm40/nanowinbt/ctypes_winrt.py rename to shared/nanowinbt/ctypes_winrt.py index 1e540ff..8adb675 100644 --- a/dm40/nanowinbt/ctypes_winrt.py +++ b/shared/nanowinbt/ctypes_winrt.py @@ -44,11 +44,12 @@ def _guid(s: str, b[6:8] = b[7:5:-1] return _f(b) +def _bguid(s: str, _r=str.replace, _h=bytes.fromhex): + b = _h(_r(s, "-", "")) + return b[3::-1]+b[5:3:-1]+b[7:5:-1]+b[8:] + IID_IASYNC_INFO = _guid("00000036-0000-0000-C000-000000000046") -IID_IAGILE_OBJECT = _guid("94EA2B94-E9CC-49E0-C0FF-EE64CA8F5B90") -IID_IUNKNOWN = _guid("00000000-0000-0000-C000-000000000046") -# Typed async completion handler IIDs from WinRT metadata (System.Runtime.WindowsRuntime). IID_ASYNC_COMPLETED_HANDLER_BLUETOOTH_LE_DEVICE = _guid( "9156B79F-C54A-5277-8F8B-D2CC43C7E004" ) @@ -62,7 +63,6 @@ def _guid(s: str, "2154117A-978D-59DB-99CF-6B690CB3389B" ) -# Interface IIDs gathered from runtime introspection and WinSDK 22621 headers. IID_BLUETOOTH_LE_ADVERTISEMENT_WATCHER = _guid( "A6AC336F-F3D3-4297-8D6C-C81EA6623F40" ) @@ -74,7 +74,6 @@ def _guid(s: str, IID_IBUFFER_FACTORY = _guid("71AF914D-C10F-484B-BC50-14BC623B3A27") IID_IBUFFER_BYTE_ACCESS = _guid("905A0FEF-BC53-11DF-8C49-001E4FC686DA") -# Delegate specialization IIDs from Windows.Devices.Bluetooth.Advertisement.h. IID_TYPED_EVENT_HANDLER_WATCHER_RECEIVED = _guid( "90EB4ECA-D465-5EA0-A61C-033C8C5ECEF2" ) @@ -95,11 +94,10 @@ def _guid(s: str, BLUETOOTH_CONNECTION_STATUS_CONNECTED = 1 -_IUNKNOWN_BYTES = bytes(IID_IUNKNOWN) -_IAGILE_BYTES = bytes(IID_IAGILE_OBJECT) +_IUNKNOWN_BYTES = _bguid("00000000-0000-0000-C000-000000000046") +_IAGILE_BYTES = _bguid("94EA2B94-E9CC-49E0-C0FF-EE64CA8F5B90") -# Reusable WINFUNCTYPE prototypes for COM vtable fields and delegate construction. _QI_FUNC = ctypes.WINFUNCTYPE( _c_long, _c_void_p, @@ -166,7 +164,6 @@ def __exit__(self, exc_type, exc, tb): def _vtbl_invoke(this_ptr: ctypes.c_void_p | int, index, restype, argtypes, *args): - # WINFUNCTYPE returns python int pointer instead of c_void_p addr = getattr(this_ptr, 'value', this_ptr) return _get_vtbl_fn_type(restype, argtypes)( _voidp_at(_voidp_at(addr).value + index * _SZ_VOIDP).value # type: ignore[operator] @@ -206,7 +203,6 @@ def _vtbl_invoke(this_ptr: ctypes.c_void_p | int, index, restype, argtypes, *arg _ro_activate_instance.argtypes = [_c_void_p, _PPVOID] _ro_activate_instance.restype = _c_long -# Cache function prototypes so _vtbl_invoke does not rebuild WINFUNCTYPE objects. _VTBL_FN_TYPE_CACHE: dict = {} @@ -219,7 +215,6 @@ def _get_vtbl_fn_type(restype: type, argtypes: tuple[type, ...]): return fn_type -# Common argtype tuples for _vtbl_invoke call sites. _ARG_PGUID_PPVOID = (_PGUID, _PPVOID) _ARG_OUT_INT = (_PINT,) _ARG_OUT_INT16 = (_PINT16,) @@ -232,6 +227,7 @@ def _get_vtbl_fn_type(restype: type, argtypes: tuple[type, ...]): _ARG_UINT64_OUT_VOIDP = (_c_uint64, _PPVOID) _ARG_CINT_OUT_VOIDP = (_c_int, _PPVOID) _ARG_GUID_OUT_VOIDP = (GUID, _PPVOID) +_ARG_GUID_CINT_OUT_VOIDP = (GUID, _c_int, _PPVOID) _ARG_VOIDP_OUT_VOIDP = (_c_void_p, _PPVOID) _ARG_VOIDP_CINT_OUT_VOIDP = (_c_void_p, _c_int, _PPVOID) _ARG_EVENT_TOKEN = (EventRegistrationToken,) @@ -370,7 +366,6 @@ def btle_statics_from_bluetooth_address_async( # IBluetoothLEDeviceStatics::Fro def btle_device6_request_throughput_params(device_ptr: ctypes.c_void_p) -> None: """Request ThroughputOptimized connection parameters (interval 12 = 15ms).""" - # QI Device6 first — absent on Win10 d6 = _c_void_p() if _vtbl_invoke(device_ptr, 0, _c_long, _ARG_PGUID_PPVOID, @@ -396,10 +391,10 @@ def btle_device6_request_throughput_params(device_ptr: ctypes.c_void_p) -> None: release_ptr(d6) -def btle_device3_get_gatt_services_for_uuid_async(ptr, service_uuid): # IBluetoothLEDevice3::GetGattServicesForUuidAsync [10] +def btle_device3_get_gatt_services_for_uuid_async(ptr, service_uuid): # IBluetoothLEDevice3::GetGattServicesForUuidWithCacheModeAsync [11] op = _c_void_p() - hr = _vtbl_invoke(ptr, 10, _c_long, _ARG_GUID_OUT_VOIDP, service_uuid, _byref(op)) - _check_hresult(hr, "BTLEDevice3.GetGattServicesForUuidAsync") + hr = _vtbl_invoke(ptr, 11, _c_long, _ARG_GUID_CINT_OUT_VOIDP, service_uuid, _c_int(1), _byref(op)) + _check_hresult(hr, "BTLEDevice3.GetGattServicesForUuidWithCacheModeAsync") return op diff --git a/dm40/nanowinbt/radio.py b/shared/nanowinbt/radio.py similarity index 93% rename from dm40/nanowinbt/radio.py rename to shared/nanowinbt/radio.py index 2f44826..67e43f5 100644 --- a/dm40/nanowinbt/radio.py +++ b/shared/nanowinbt/radio.py @@ -8,8 +8,6 @@ class NanoRadioStateError(RuntimeError): _INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value _RADIO_FUNCS = None -# byref holds a strong ref to the c_ulong via PyCArgObject.obj (Py_NewRef); -# safe to cache at module level — verified against CPython 3.10–3.14 source. _FIND_RADIO_PARAMS_PTR = ctypes.byref(ctypes.c_ulong(4)) # sizeof(BLUETOOTH_FIND_RADIO_PARAMS) diff --git a/dm40/nanowinbt/scanner.py b/shared/nanowinbt/scanner.py similarity index 98% rename from dm40/nanowinbt/scanner.py rename to shared/nanowinbt/scanner.py index e747b8d..8238e20 100644 --- a/dm40/nanowinbt/scanner.py +++ b/shared/nanowinbt/scanner.py @@ -1,8 +1,8 @@ from .. import mini_asyncio as asyncio -from . import ctypes_winrt as w +from ..nanowinbt import ctypes_winrt as w from .ctypes_com import RoSession from .radio import ensure_bluetooth_radio_on, get_bluetooth_radio_state -from dm40.types import NanoBLEDevice +from shared.types import NanoBLEDevice class NanoScanner: diff --git a/dm40/theme_store.py b/shared/theme_store.py similarity index 96% rename from dm40/theme_store.py rename to shared/theme_store.py index 8b5117c..c7daa76 100644 --- a/dm40/theme_store.py +++ b/shared/theme_store.py @@ -1,4 +1,4 @@ -from dm40.types import ThemePalette +from shared.types import ThemePalette _COLORS_BLOCK = 7 * 10 diff --git a/dm40/types.py b/shared/types.py similarity index 100% rename from dm40/types.py rename to shared/types.py diff --git a/utils/theme_store_builder.py b/utils/theme_store_builder.py index 2a98e92..cad85b0 100644 --- a/utils/theme_store_builder.py +++ b/utils/theme_store_builder.py @@ -60,7 +60,7 @@ def serialize_theme_store(themes: list[tuple[str, ...]]) -> bytes: def main() -> None: try: - from dm40.theme_store import deserialize_theme_store_palettes + from shared.theme_store import deserialize_theme_store_palettes except ModuleNotFoundError as exc: if __package__ in (None, ""): raise SystemExit(