From dce0661359eeec82ff8f781ba1bee1288b199286 Mon Sep 17 00:00:00 2001 From: Serein_Y Date: Wed, 29 Apr 2026 13:57:16 +0800 Subject: [PATCH 1/2] Update: TN160 User Page --- projects/app_thermal160_camera/main.py | 314 ++++++++++++++++++------- 1 file changed, 231 insertions(+), 83 deletions(-) diff --git a/projects/app_thermal160_camera/main.py b/projects/app_thermal160_camera/main.py index c9c4ae51..09b5a7f6 100644 --- a/projects/app_thermal160_camera/main.py +++ b/projects/app_thermal160_camera/main.py @@ -1,4 +1,5 @@ import logging +import struct import time from typing import Tuple @@ -10,9 +11,16 @@ PMOD_W = 160 PMOD_H = 120 -FRAME_SIZE = PMOD_W * PMOD_H +FRAME_PIXEL_SIZE = PMOD_W * PMOD_H +# 物理帧尾 30 字节(与 PC 端 thermocam_gui.exe protocol.py 的 FRAME_SIZE +# = 19231 一致:1 引导 FF + 19200 像素 + 30 telemetry)。本文件只解析前 +# 6 字节 (vtemp/t_lo/t_hi),但 FRAME_SIZE 必须按物理长度算,否则下一帧 +# 紧挨校验会落在 NTC 字节上把每帧都误杀。 +FRAME_TAIL_SIZE = 30 +FRAME_SIZE = FRAME_PIXEL_SIZE + FRAME_TAIL_SIZE SKIP_COUNT = 10 CMAP = True # 渲染管线分支路由开关:True为热成像伪彩映射,False为原生灰度零拷贝 +INT16_MAX = 0x7FFF # 配置常量 BAUDRATE_INIT = 2000000 @@ -21,9 +29,13 @@ EMA_ALPHA = 0.2 SERIAL_TIMEOUT = 1 FONT_SCALE = 2.0 +CENTER_TEMP_SCALE = 1.1 +CORNER_TEMP_SCALE = 0.9 # UART 配置 -UART_BUFFER_SIZE = 4096 # 读取缓冲区大小 +# 单次 read 容量大于一帧(19220 B),让每帧只触发 1 次 serial.read 而非 ~5 +# 次,消除多次系统调用的调度开销,是这条流水线最显著的 FPS 瓶颈之一。 +UART_BUFFER_SIZE = 32768 # 读取缓冲区大小 RETRY_DELAY = 0.1 # 重试延迟(秒) # UI 配置 @@ -35,14 +47,33 @@ # 配置日志 logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' + format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) + +def pixels_to_temp(pixels: np.ndarray, t_lo: float, t_hi: float) -> np.ndarray: + if t_hi <= t_lo: + return np.full_like(pixels, t_lo, dtype=np.float32) + norm = (pixels.astype(np.float32) / 254.0).clip(0.0, 1.0) + return (t_lo + (t_hi - t_lo) * norm).astype(np.float32) + + +def draw_cross(img_disp: image.Image, x: int, y: int, color, size: int = 12) -> None: + img_disp.draw_line(x - size, y, x + size, y, color) + img_disp.draw_line(x, y - size, x, y + size, color) + + +def draw_label(img_disp: image.Image, x: int, y: int, text: str, color, scale: float = 0.8) -> None: + img_disp.draw_string(x + 1, y + 1, text, image.COLOR_BLACK, scale=scale) + img_disp.draw_string(x, y, text, color, scale=scale) + + def is_in_button(x: int, y: int, btn_pos: Tuple[int, int, int, int]) -> bool: return (btn_pos[0] < x < btn_pos[0] + btn_pos[2] and btn_pos[1] < y < btn_pos[1] + btn_pos[3]) + def get_back_btn_img(width: int) -> Tuple[image.Image, int, int]: ret_width = int(width * BACK_BUTTON_WIDTH_RATIO) img_back = image.load(DEFAULT_ICON_PATH) @@ -52,9 +83,10 @@ def get_back_btn_img(width: int) -> Tuple[image.Image, int, int]: if h % 2 != 0: h += 1 img_back = img_back.resize(w, h) - img_back = img_back.rotate(180) + img_back = img_back.rotate(0) return img_back, w, h + class HardwareHAL: """Hardware Abstraction Layer for thermal camera device management. @@ -79,23 +111,23 @@ def serial_port(cls) -> str: cls._device = dn return port + def main() -> None: disp = display.Display() - disp.set_hmirror(True) - disp.set_vflip(False) + disp.set_hmirror(False) + disp.set_vflip(True) ts = touchscreen.TouchScreen() img_back, img_back_w, img_back_h = get_back_btn_img(disp.width()) back_rect = [0, 0, img_back_w, img_back_h] - # CMAP 按钮(动态更新,延迟初始化) img_cmap: image.Image = None cmap_rect = [ disp.width() - int(disp.width() * BACK_BUTTON_WIDTH_RATIO), disp.height() - 0, 1, 1, - ] # 占位,后续用 init_cmap_btn 填充 + ] def init_cmap_btn(label: str) -> None: nonlocal img_cmap, cmap_rect @@ -106,7 +138,7 @@ def init_cmap_btn(label: str) -> None: text_y = (h - char_h) // 2 text_x = max(2, (w - len(label) * 6) // 2) img_cmap.draw_string(text_x, text_y, label, image.COLOR_WHITE, scale=0.6) - img_cmap = img_cmap.rotate(180) + img_cmap = img_cmap.rotate(0) cmap_rect = [disp.width() - w, disp.height() - h, w, h] init_cmap_btn("CMAP") @@ -125,13 +157,12 @@ def init_cmap_btn(label: str) -> None: try: if HardwareHAL._device == "MaixCAM2": - serial.write(b'\x44') + serial.write(b"\x44") serial.close() time.sleep(0.1) serial = uart.UART(port=port_name, baudrate=BAUDRATE_HIGH) def _safe_colormap(name: str, fallback_id: int): - """安全获取 colormap ID,不支持时返回 None。""" cp = getattr(cv2, name, fallback_id) test = np.arange(256, dtype=np.uint8).reshape(1, 256) try: @@ -140,13 +171,16 @@ def _safe_colormap(name: str, fallback_id: int): except Exception: return None + # TURBO 放首位作为默认:中段为绿/黄,均匀场景噪声映射过去是自然 + # 过渡色,从根本上消除 HOT 默认下的「静置偏红」。HOT/COOL 等仍在列 + # 表里,CMAP 按键可循环切换,用户需要 HOT 风格随时可切回。 cmap_options = [] for _name, _fid in [ + ("COLORMAP_TURBO", 20), ("COLORMAP_HOT", 0), ("COLORMAP_COOL", 1), ("COLORMAP_DEEPGREEN", 15), ("COLORMAP_MAGMA", 13), - ("COLORMAP_TURBO", 20), ]: entry = _safe_colormap(_name, _fid) if entry: @@ -174,16 +208,22 @@ def redraw_cmap_btn() -> None: lut = get_cv2_lut(cmap_idx) color_buf = np.zeros((PMOD_H, PMOD_W, 3), dtype=np.uint8) - # 预分配显示缓冲区,避免循环内频繁创建对象导致撕裂 disp_w, disp_h = disp.width(), disp.height() - disp_buffer = np.zeros((disp_h, disp_w, 3), dtype=np.uint8) + # 双缓冲:两个物理缓冲轮替。当前帧写 buffer[i],disp.show 异步读取 + # 的同时下一帧写 buffer[i^1],不会与 DMA 读竞争。配合下面的 + # copy=False,省掉 cv2image 每帧~900KB 的内存拷贝。 + disp_buffers = [ + np.zeros((disp_h, disp_w, 3), dtype=np.uint8), + np.zeros((disp_h, disp_w, 3), dtype=np.uint8), + ] + db_idx = 0 buffer = bytearray() skip = 0 frame_count = 0 frame_timestamps = [] - fps_ema = 0.0 + fps_ema = 0.0 logger.info( "System: Data pump and rendering pipeline successfully initialized." @@ -201,9 +241,12 @@ def redraw_cmap_btn() -> None: lut = get_cv2_lut(cmap_idx) redraw_cmap_btn() logger.info(f"System: Switched to colormap {cmap_options[cmap_idx][0]}") - time.sleep(0.2) # 防抖 + time.sleep(0.2) + + chunk = serial.read(UART_BUFFER_SIZE, SERIAL_TIMEOUT) + if chunk: + chunk = bytes(chunk) - chunk = serial.read(UART_BUFFER_SIZE, timeout=SERIAL_TIMEOUT) if not chunk and not capture_start: main_img = image.Image(disp.width(), disp.height(), image.Format.FMT_RGB888) main_img.draw_rect( @@ -217,87 +260,191 @@ def redraw_cmap_btn() -> None: main_img.draw_string( x_msg, y_msg, msg, image.COLOR_WHITE, scale=FONT_SCALE ) - # 方向:统一旋转 180 度,并绘制按钮 - main_img = main_img.rotate(180) - main_img.draw_image(disp.width() - img_back_w, disp.height() - img_back_h, img_back) + main_img.draw_image(0, 0, img_back) if img_cmap is not None: - main_img.draw_image(0, 0, img_cmap) + main_img.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_cmap) disp.show(main_img) continue - if frame_count == 0: logger.info("System: First data chunk received.") buffer.extend(chunk) + # ---- Drain-to-latest 同步策略 ---- + # 同 PC 端 thermocam_gui.exe 的等价行为:解析阶段把缓冲里所有合 + # 法帧全部消费掉,但只保留 *最后一帧* 用于渲染,丢弃中间帧。 + # 单线程下 disp.show + cv2 流水线一旦落后,串口侧会堆积多帧; + # 旧版"每解析一帧就渲染一帧"会把延迟越拉越大,缓冲越拉越长, + # 既导致屏幕卡顿、也提高了伪 0xFF 假同步的命中率(就是 output/ + # 里那张半幅错乱图的成因)。drain-to-latest 让每外循环最多一 + # 次重渲染,CPU 富余、缓冲不堆积、误同步概率断崖下降。 + latest_frame_data = None + latest_t_lo_x10 = INT16_MAX + latest_t_hi_x10 = INT16_MAX while True: - idx = buffer.find(b'\xFF') + idx = buffer.find(b"\xFF") if idx == -1: buffer.clear() break + if len(buffer) - (idx + 1) < FRAME_SIZE: + if idx > 0: + del buffer[:idx] + break - if len(buffer) - (idx + 1) >= FRAME_SIZE: - frame_data = buffer[idx + 1 : idx + 1 + FRAME_SIZE] - - err_idx = frame_data.find(b'\xFF') - if err_idx != -1: - logger.warning( - f"Warning: Protocol violation! Unexpected 0xFF found in " - f"payload at offset {err_idx}. Resyncing..." - ) - del buffer[:idx + 1 + err_idx] + frame_data = bytes(buffer[idx + 1: idx + 1 + FRAME_SIZE]) + + # 帧对齐校验走 telemetry 合理性,而非"像素区内有 0xFF 就 + # 重同步"——后者会把固件 FFC 瞬态/饱和像素产生的合法 255 + # 也误判为失步。VTEMP 是 14-bit 大端(固件对每行累加前已 + # &0x3FFF),telemetry[0] 高 2 bit 恒为 0;t_lo/t_hi 是 + # °C×10 的物理量,落在合理区间且 t_hi≥t_lo。 + telemetry = frame_data[FRAME_PIXEL_SIZE:] + if telemetry[0] & 0xC0: + del buffer[:idx + 1] + continue + t_lo_x10 = struct.unpack_from(">h", telemetry, 2)[0] + t_hi_x10 = struct.unpack_from(">h", telemetry, 4)[0] + if t_lo_x10 != INT16_MAX and t_hi_x10 != INT16_MAX: + if not (-1000 <= t_lo_x10 <= 3200 and + -1000 <= t_hi_x10 <= 3200 and + t_hi_x10 >= t_lo_x10): + del buffer[:idx + 1] continue - if skip <= SKIP_COUNT: - skip += 1 + # 帧间紧挨校验:协议里相邻两帧背靠背,下一帧的 0xFF 必须正 + # 好出现在本帧消费完的位置。这条 magic 能筛掉"FF + telemetry + # 凑巧合理 + 像素区中段被 UART RX FIFO 短暂溢出/固件脏数据 + # 污染"的坏帧——这种坏帧在 drain-to-latest 下会持续显示一 + # 整个 drain 周期,比单帧渲染时显眼得多(即 output/ 里上 1/3 + # 真实图像 + 下 2/3 椒盐噪声那张图的成因)。 + # 缓冲不够长时跳过此校验,留到下一轮 drain 再判。 + next_ff_pos = idx + 1 + FRAME_SIZE + if len(buffer) > next_ff_pos and buffer[next_ff_pos] != 0xFF: + del buffer[:idx + 1] + continue + + # 通过校验,推进缓冲并记账 + del buffer[:idx + 1 + FRAME_SIZE] + frame_count += 1 + + if skip <= SKIP_COUNT: + skip += 1 + continue + + # 关键:只保留缓冲里最后一帧用于渲染,旧帧直接丢弃 + latest_frame_data = frame_data + latest_t_lo_x10 = t_lo_x10 + latest_t_hi_x10 = t_hi_x10 + + if latest_frame_data is None: + continue + + # ===== 渲染最新一帧(每外循环最多一次 disp.show)===== + t_lo_x10 = latest_t_lo_x10 + t_hi_x10 = latest_t_hi_x10 + gray_np = np.frombuffer(latest_frame_data[:FRAME_PIXEL_SIZE], dtype=np.uint8).reshape(PMOD_H, PMOD_W) + gray_np = cv2.flip(gray_np, -1) + has_temp = ( + t_lo_x10 != INT16_MAX and + t_hi_x10 != INT16_MAX and + t_hi_x10 > t_lo_x10 + ) + + # 选中当前帧要写入的物理缓冲。另一个缓冲此刻可能仍被 + # 上一帧的 disp.show DMA 读取,不会被本帧写动。 + disp_buffer = disp_buffers[db_idx] + if CMAP: + gray_blurred = cv2.GaussianBlur(gray_np, (3, 3), 1) + np.take(lut, gray_blurred, axis=0, out=color_buf) + cv2.resize(color_buf, (disp_w, disp_h), dst=disp_buffer, interpolation=cv2.INTER_LINEAR) + else: + gray_resized = cv2.resize(gray_np, (disp_w, disp_h), interpolation=cv2.INTER_LINEAR) + disp_buffer[:, :, 0] = gray_resized + disp_buffer[:, :, 1] = gray_resized + disp_buffer[:, :, 2] = gray_resized + + # 双缓冲保护下安全启用 copy=False,省掉每帧 ~900KB 的 + # cv2image 内部 memcpy。竞态由缓冲轮替天然隔离。 + img_disp = image.cv2image(disp_buffer, bgr=False, copy=False) + + center_x = disp_w // 2 + center_y = disp_h // 2 + draw_cross(img_disp, center_x, center_y, image.COLOR_WHITE, 14) + + if has_temp: + t_lo = t_lo_x10 / 10.0 + t_hi = t_hi_x10 / 10.0 + temp_img = pixels_to_temp(gray_np, t_lo, t_hi) + + min_idx = np.argmin(temp_img) + max_idx = np.argmax(temp_img) + min_y, min_x = np.unravel_index(min_idx, temp_img.shape) + max_y, max_x = np.unravel_index(max_idx, temp_img.shape) + sx = disp_w / PMOD_W + sy = disp_h / PMOD_H + + draw_cross(img_disp, int(max_x * sx), int(max_y * sy), image.COLOR_RED, 12) + draw_cross(img_disp, int(min_x * sx), int(min_y * sy), image.COLOR_BLUE, 12) + + center_temp = temp_img[PMOD_H // 2, PMOD_W // 2] + draw_label( + img_disp, + min(max(center_x + 18, 8), disp_w - 96), + min(max(center_y - 18, 8), disp_h - 24), + f"{center_temp:.1f}C", + image.COLOR_WHITE, + scale=CENTER_TEMP_SCALE, + ) + + text_x = 8 + text_y = disp_h - 82 + draw_label( + img_disp, + text_x, + text_y, + f"MAX {temp_img[max_y, max_x]:.1f}C", + image.COLOR_RED, + scale=CORNER_TEMP_SCALE, + ) + draw_label( + img_disp, + text_x, + text_y + 26, + f"MIN {temp_img[min_y, min_x]:.1f}C", + image.COLOR_BLUE, + scale=CORNER_TEMP_SCALE, + ) + draw_label( + img_disp, + text_x, + text_y + 52, + f"RNG {t_lo:.1f}~{t_hi:.1f}C", + image.COLOR_WHITE, + scale=0.85, + ) + + img_disp.draw_image(0, 0, img_back) + if img_cmap is not None: + img_disp.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_cmap) + + capture_start = True + disp.show(img_disp) + db_idx ^= 1 # 切到另一个物理缓冲供下一帧写入 + + current_time = time.time() + frame_timestamps.append(current_time) + if len(frame_timestamps) > FPS_WINDOW_SIZE: + frame_timestamps.pop(0) + if len(frame_timestamps) > 1: + window_duration = frame_timestamps[-1] - frame_timestamps[0] + if window_duration > 0: + window_fps = ((len(frame_timestamps) - 1) / window_duration) + if fps_ema == 0.0: + fps_ema = window_fps else: - # 直接将帧数据转为 numpy 数组,避免创建 Image 对象 - gray_np = np.frombuffer(frame_data, dtype=np.uint8).reshape(PMOD_H, PMOD_W) - - if CMAP: - # 高斯模糊(使用 OpenCV 加速) - gray_blurred = cv2.GaussianBlur(gray_np, (3, 3), 1) - # 颜色映射到预分配缓冲区 - np.take(lut, gray_blurred, axis=0, out=color_buf) - # 直接缩放到显示缓冲区,避免中间对象创建 - cv2.resize(color_buf, (disp_w, disp_h), dst=disp_buffer, interpolation=cv2.INTER_LINEAR) - else: - # 灰度图:先缩放单通道,然后复制到三通道 - gray_resized = cv2.resize(gray_np, (disp_w, disp_h), interpolation=cv2.INTER_LINEAR) - disp_buffer[:,:,0] = gray_resized - disp_buffer[:,:,1] = gray_resized - disp_buffer[:,:,2] = gray_resized - - # 创建显示图像 - img_disp = image.cv2image(disp_buffer, bgr=False, copy=False) - img_disp.draw_image(disp_w - img_back_w, disp_h - img_back_h, img_back) - if img_cmap is not None: - img_disp.draw_image(0, 0, img_cmap) - - capture_start = True - disp.show(img_disp) - - current_time = time.time() - frame_timestamps.append(current_time) - if len(frame_timestamps) > FPS_WINDOW_SIZE: - frame_timestamps.pop(0) - if len(frame_timestamps) > 1: - window_duration = frame_timestamps[-1] - frame_timestamps[0] - if window_duration > 0: - window_fps = ((len(frame_timestamps) - 1) / window_duration) - if fps_ema == 0.0: - fps_ema = window_fps - else: - fps_ema = ((EMA_ALPHA * window_fps) + ((1.0 - EMA_ALPHA) * fps_ema)) - if frame_count % 10 == 0: - osd_text = f"FPS: {fps_ema:.2f}" - logger.info(osd_text) - - frame_count += 1 - buffer = buffer[idx + 1 + FRAME_SIZE:] - else: - if idx > 0: - buffer = buffer[idx:] - break + fps_ema = ((EMA_ALPHA * window_fps) + ((1.0 - EMA_ALPHA) * fps_ema)) + if frame_count % 10 == 0: + osd_text = f"FPS: {fps_ema:.2f}" + logger.info(osd_text) except Exception as e: logger.error(f"Fatal: Unhandled pipeline exception: {str(e)}") @@ -307,5 +454,6 @@ def redraw_cmap_btn() -> None: serial.close() logger.info("System: UART resource securely released via interrupt vector.") + if __name__ == "__main__": main() From d64d82ef2191ef0fb50aea775d4f9ea6d04cc5d9 Mon Sep 17 00:00:00 2001 From: Serein_Y Date: Wed, 29 Apr 2026 14:24:13 +0800 Subject: [PATCH 2/2] Update: TN160 displays temperature data. --- projects/build_all.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/build_all.sh b/projects/build_all.sh index 6f117d84..f3d0d462 100755 --- a/projects/build_all.sh +++ b/projects/build_all.sh @@ -7,7 +7,7 @@ set -e # 定义不同平台的黑名单 blacklist_linux=() -blacklist_maixcam=("app_yoloworld" "app_vlm" "app_mono_depth_estimation" "app_chat" +blacklist_maixcam=("app_yoloworld" "app_vlm" "app_mono_depth_estimation" "app_chat" "app_thermal160_camera" "app_speech" "app_thermal256_camera" "app_image_generation" "app_thermal256_nightvision") blacklist_maixcam2=()