diff --git a/Under Construction/Network.app/Network b/Under Construction/Network.app/Network index 83b728e0..d89cc15c 100755 --- a/Under Construction/Network.app/Network +++ b/Under Construction/Network.app/Network @@ -12,13 +12,179 @@ # https://docs.freebsd.org/en/books/handbook/advanced-networking/#network-wireless -from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QHBoxLayout, QGroupBox, QSlider, QWidget, \ - QActionGroup, QDesktopWidget, QMessageBox, QInputDialog, QLineEdit -from PyQt5.QtGui import QIcon, QPixmap, QCursor -from PyQt5.QtCore import Qt, QProcess, QMetaObject, QCoreApplication, QEvent, QObject, QTimer, QPoint, QTimer -from subprocess import Popen, check_output -import sys, os, re, subprocess -import re +from ctypes import cast, POINTER, c_uint32 +from PyQt5.sip import voidptr +from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMenu, QAction, QHBoxLayout, QWidget, QActionGroup, \ + QMessageBox, QLineEdit, QStyle, QStyleOptionMenuItem, QLabel, QWidgetAction, QProxyStyle, QDialog, \ + QGridLayout, QLayout, QCheckBox, QPushButton, QSizePolicy +from PyQt5.QtGui import QIcon, QPixmap, QCursor, QPainter, QPaintEvent, QImage, qGray, qAlpha, qRgba +from PyQt5.QtCore import Qt, QProcess, QObject, QTimer, QSize, QFileSystemWatcher +from collections import namedtuple +import sys, os, re, netifaces + + +def run_command_and_get_output(command: str, args, process = QProcess()) -> str: + process.setProgram(command) + process.setArguments(args) + print(process.program() + " " + " ".join(process.arguments())) + process.start() + process.waitForFinished() + return str(process.readAllStandardOutput(), 'UTF-8') + + +def get_default_route_interface() -> str: + route_info = run_command_and_get_output("route", ["-n", "get", "default"]) + interface_index = route_info.find("interface: ") + default_route_interface = "" + + if interface_index != -1: + default_route_interface = route_info[interface_index + 11:route_info.find("\n", interface_index)] + + return default_route_interface + + +WIRED_CONNECTED = 0 +WIRED_DISCONNECTED = 1 +WIRED_OFFLINE = 2 + + +def get_wired_status(interface_name: str) -> int: + addresses: dict = netifaces.ifaddresses(interface_name) + + if netifaces.AF_INET in addresses: + wired_status = WIRED_CONNECTED + else: + wired_status = WIRED_DISCONNECTED + + return wired_status + + +def get_pixmap_for_wired_status(status: int) -> QPixmap: + if status == WIRED_CONNECTED: + icon_name = "network-wired-symbolic" + elif status == WIRED_DISCONNECTED: + icon_name = "network-wired-disconnected-symbolic" # Same as "network-wired-offline-symbolic" + else: + icon_name = "network-wired-no-route-symbolic" + + return get_tray_icon_with_color(icon_name) + + +def get_pixmap_for_wpa_status(status: str, signal_level: int) -> QPixmap: + # https://w1.fi/wpa_supplicant/devel/defs_8h.html#a4aeb27c1e4abd046df3064ea9756f0bca6304c99164cf51fa8baecf8b124c6117 + if status == "SCANNING" or status == "ASSOCIATING": + icon_name = "network-wireless-acquiring-symbolic" + elif status == "COMPLETED": + # https://github.com/maxatome/wifimgr/blob/ca9951f0c08a72ad3f36c98a970414219d5b9b03/src/wifimgr-gtk.c#L1312 + if signal_level > 80: + icon_name = "network-wireless-signal-excellent-symbolic" + elif signal_level > 60: + icon_name = "network-wireless-signal-good-symbolic" + elif signal_level > 40: + icon_name = "network-wireless-signal-ok-symbolic" + elif signal_level > 20: + icon_name = "network-wireless-signal-weak-symbolic" + elif signal_level > 0: + icon_name = "network-wireless-signal-none-symbolic" + else: + icon_name = "network-wireless-signal-none-symbolic" + elif status == "INTERFACE_DISABLED": + icon_name = "network-wireless-no-route-symbolic" + else: + icon_name = "network-wireless-offline-symbolic" + + return get_tray_icon_with_color(icon_name) + + +def get_tray_icon_with_color(icon_name: str) -> QPixmap: + icon = QIcon.fromTheme(icon_name) + pixmap: QPixmap = icon.pixmap(QSize(16, 16)) + image = pixmap.toImage() + # Use 100 on this icon set for a gray color that looks similar to the rest of the icons on the system tray + return QPixmap.fromImage(set_monochrome_image_color(image, 100)) + + +def is_usb_tethered_interface(interface_name: str) -> bool: + if "ue" in interface_name: + number = interface_name[-1] + output = run_command_and_get_output("sysctl", ["net.ue.%s.%%parent" % number]) + parent = output[output.rfind(":") + 2:].strip() + return parent[0:-1] in ["urndis", "cdce", "ipheth"] + + return False + + +def is_wired_interface_plugged_in(interface_name: str) -> bool: + output = run_command_and_get_output("ifconfig", ["-v", interface_name]) + + # Some interfaces (like the ones with the "urndis" driver) don't display status, media and parent interface + # rows (I assume this is the same for any USB Tethered interface), but unlike USB Ethernet adapters, + # if they are present that means that they are really "plugged in". + return "status: active" in output or is_usb_tethered_interface(interface_name) + + +# https://github.com/helloSystem/Menu/blob/c13548d3866c3896d728d2388bd0d4cd636a3a91/plugin-statusnotifier/statusnotifierbutton.cpp#L108 +def set_monochrome_image_color(src_image: QImage, rgb_value: int = -1) -> QImage: + image: QImage = src_image.convertToFormat(QImage.Format_ARGB32 if src_image.hasAlphaChannel() else QImage.Format_RGB32) + pointer: voidptr = image.bits() + pixel_array = cast(pointer.__int__(), POINTER(c_uint32)) + pixel_count = image.width() * image.height() + + for i in range(pixel_count): + pixel = pixel_array.__getitem__(i) + + value = rgb_value + if rgb_value == -1: + value = qGray(pixel) + + value = qRgba(value, value, value, qAlpha(pixel)) + pixel_array.__setitem__(i, value) + + return image + + +# Only need this for now +NetworkInfo = namedtuple("NetworkInfo", ["flags", "signal_level"]) + + +def get_network_info(bssid: str) -> NetworkInfo: + lines = run_command_and_get_output("wpa_cli", ["bss", bssid]).strip().splitlines() + + signal_strength_dbm = 0 + signal_noise_dbm = 0 + flags = "" + for line in lines: + if line.startswith("noise="): + signal_noise_dbm = int(line.split("=")[1]) + elif line.startswith("level="): + signal_strength_dbm = int(line.split("=")[1]) + elif line.startswith("flags="): + flags = line.split("=")[1] + + # https://github.com/maxatome/wifimgr/blob/ca9951f0c08a72ad3f36c98a970414219d5b9b03/src/wifimgr.c#L848 + return NetworkInfo(flags, (signal_strength_dbm - signal_noise_dbm) * 4) + + +def request_network_password(dialog_text: str) -> tuple[str, bool]: + dialog = PasswordRequestDialog(dialog_text) + dialog.setWindowTitle("Wireless network password") + + ok, password = dialog.get_password() + if not ok: + print("User did not click OK in password dialog") + + return password, ok + + +def get_gateway_for_interface(interface_name) -> str: + lines = run_command_and_get_output("/sbin/resolvconf", ["-l", interface_name]).strip().splitlines() + gateway = "" + + if lines: + gateway = lines[1].split(' ')[1] + + return gateway + class NetworkMenu(QObject): @@ -26,15 +192,13 @@ class NetworkMenu(QObject): super().__init__() - # self.showTODO("It can show wireless networks but not connect to them. Do you know how to fix this?") self.showTODO() - # icon = QIcon(os.path.dirname(os.path.abspath(__file__)) + "/Resources/wireless.svg") # Does not seem to work - # TODO: Change the icon depending on signal strength; see /usr/local/share/icons/elementary-xfce/status/symbolic/network-wireless-* self.tray = QSystemTrayIcon() self.tray.setVisible(True) self.menu = QMenu() + self.wirelessGroup = QActionGroup(self.menu) # Only one of the actions added to this group can be active self.tray.setContextMenu(self.menu) self.tray.activated.connect(self.show_menu) @@ -44,20 +208,29 @@ class NetworkMenu(QObject): # Sneaky PyQt quirk! A reference to the actions must be kept around or the actions will be destroyed self.actions = [] self.sliderWindow = None + self.check_network_password = False + self.status_lines = "" self.updateStatus() self.timer = QTimer() - self.timer.setInterval(3000) # Every 3 seconds + self.timer.setInterval(3000) # Every 3 seconds self.timer.timeout.connect(self.updateStatus) self.timer.start() - self.refreshMenu() # Initially populate the menu + log_file_watcher = QFileSystemWatcher(["/var/log/messages"], self) + log_file_watcher.fileChanged.connect(self.read_log_file) + + with open("/var/log/messages", 'rb') as file: + self.log_file_position = file.seek(0, os.SEEK_END) + + self.refreshMenu() # Initially populate the menu self.tray.installEventFilter(self) # FIXME: This never seems to get called, why? self.installEventFilter(self) # FIXME: This never seems to get called, why? - def eventFilter(self, obj, event): - print("eventFilter function running") # FIXME: Why is this never called when the icon is right-clicked? + def eventFilter(self, obj, event) -> bool: + print("eventFilter function running") # FIXME: Why is this never called when the icon is right-clicked? # We need to refresh the contents of the right-click menu somehow when the user right-clicks... + return super().eventFilter(obj, event) def show_menu(self, reason): self.updateMenu() @@ -70,10 +243,9 @@ class NetworkMenu(QObject): # self.menu.move(QCursor.pos()) # self.menu.show() # When called like this, it appears as a context menu over the menu bar, which is not desired # self.menu.activateWindow() # Needed on some systems to make the menu go away when clicked anywhere else but in the menu? - self.menu.popup(QCursor.pos()) # When called like this, it appears almost at the correct location but with the wrong font size, + self.menu.popup(QCursor.pos()) # When called like this, it appears almost at the correct location but with the wrong font size, # as if it was a context menu rather than a real menu; probably because somehow its parent now is not the global menu bar main window? - def refreshMenu(self): self.actions = [] # Get the networks from wpa_cli @@ -87,88 +259,151 @@ class NetworkMenu(QObject): p.waitForFinished() def updateStatus(self): + bssid = "" + status = "" p = QProcess() - p.setProgram("wpa_cli") - p.setArguments(["status"]) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() + interfaces = run_command_and_get_output("ifconfig", ["-ul", "ether"], p).strip().split(' ') - self.status_lines = str(p.readAllStandardOutput(), 'utf-8').strip().split("\n") - + wlan_present = False + wired_status = -1 + if len(interfaces) == 0: + self.tray.setIcon(QIcon(get_pixmap_for_wired_status(WIRED_OFFLINE))) + + for i in interfaces: + # wpa_cli takes care of wireless interfaces + if "wlan" in i: + wlan_present = True + continue + if not is_wired_interface_plugged_in(i): + continue + + wired_status = get_wired_status(i) + + if not wlan_present: + self.tray.setIcon(QIcon(get_pixmap_for_wired_status(wired_status))) + return + + self.status_lines = run_command_and_get_output("wpa_cli", ["status"], p).strip().splitlines() print(self.status_lines) # Update the icon in the menu for element in self.status_lines: - if element.startswith("wpa_state="): + if element.startswith("bssid="): + bssid = element.split("=")[1] + elif element.startswith("wpa_state="): status = element.split("=")[1] break - try: - if status == "SCANNING": - self.tray.setIcon(QIcon.fromTheme("network-wireless-acquiring-symbolic")) - elif status == "COMPLETED": - self.tray.setIcon(QIcon.fromTheme("network-wireless-symbolic")) - # TODO: Set different icons based on signal_level - else: - self.tray.setIcon(QIcon.fromTheme("network-wireless-offline-symbolic")) - except: - pass - + + if status == "COMPLETED": + self.check_network_password = False + + if wired_status == WIRED_CONNECTED: + icon_pixmap = get_pixmap_for_wired_status(wired_status) + else: + icon_pixmap = get_pixmap_for_wpa_status(status, get_network_info(bssid).signal_level) + + self.tray.setIcon(QIcon(icon_pixmap)) + + if not self.menu.isHidden(): + for action in self.menu.actions(): + if hasattr(action, "bssid"): + network_info = get_network_info(getattr(action, "bssid")) + action.defaultWidget().set_signal_level(network_info.signal_level) + action.defaultWidget().set_password_protected("PSK" in network_info.flags) + elif hasattr(action, "wired_interface"): + wired_status = get_wired_status(getattr(action, "wired_interface")) + action.defaultWidget().set_wired_status(wired_status) + def updateMenu(self): # Find out whether we are connected to one of the networks self.updateStatus() + self.refreshMenu() self.menu.clear() # Second, show p = QProcess() - p.setProgram("wpa_cli") - p.setArguments(["scan_results"]) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() + # Get interfaces marked as "UP" but exclude those containing "wlan" + interfaces = [i for i in run_command_and_get_output("ifconfig", ["-ul", "ether"], p).strip().split(' ') + if "wlan" not in i and is_wired_interface_plugged_in(i)] + wired_interfaces_available = len(interfaces) > 0 and interfaces[0] != '' + default_route_interface = get_default_route_interface() + + if wired_interfaces_available: + action = QAction("Wired") + action.setDisabled(True) + self.actions.append(action) + self.menu.addAction(action) + + for i in interfaces: + if "ue" in i: + name = "USB Ethernet (%s)" + elif "lagg" in i: + name = "Link aggregation (%s)" + else: + name = "Ethernet (%s)" - lines = str(p.readAllStandardOutput(), 'utf-8').strip().split("\n") + name = name % i - self.wirelessGroup = QActionGroup(self.menu) # Only one of the actions added to this group can be active + action = QWidgetAction(self.menu) + action.__setattr__("wired_interface", i) + action.setCheckable(False) + action.setText(name) + + wired_status = get_wired_status(i) + item = NetworkItem(self.menu, action) + item.set_default_route_interface(i == default_route_interface) + item.set_wired_status(wired_status) + action.setDefaultWidget(item) + + self.actions.append(action) + self.menu.addAction(action) action = QAction("Wireless") action.setDisabled(True) self.actions.append(action) self.menu.addAction(action) + + lines = run_command_and_get_output("wpa_cli", ["scan_results"], p).strip().splitlines() - if len(lines) > 1: + if len(lines) > 2: ssids_added_to_menu = [] - for line in lines: - if line.startswith("Selected") or line.startswith("bssid"): - continue + for line in lines[2:]: print(line) # Parse out information for each network - regex = r"([^\ ]+)\t([^\ ]+)\t([^\ ]+)\t([^\ ]+)\t(.*)$" + regex = r"([^\ ]+)\t[^\ ]+\t[^\ ]+\t([^\ ]+)\t(.*)$" matches = re.findall(regex, line) if not matches: continue print(len(matches[0])) bssid = matches[0][0] - signal_level = int(matches[0][2]) - flags = matches[0][3] - label = matches[0][4] + flags = matches[0][1] + ssid = matches[0][2] + signal_level = get_network_info(bssid).signal_level - ssid = label + label = ssid if label == "" or label.startswith("\\x00"): - label = bssid # For networks with hidden ssid (network name) - action = QAction(line) + label = bssid # For networks with hidden ssid (network name) + action = QWidgetAction(self.menu) action.__setattr__("ssid", ssid) action.__setattr__("bssid", bssid) # if 'flags' in vars(): action.__setattr__("flags", flags) action.triggered.connect(self.switchNetwork) # lambda could be used to pass an argument but the argument passed is taken at the time when this executes, which is not what we want - action.setText(label) - if "ssid=" + ssid in self.status_lines: - action.setIcon(QIcon.fromTheme("network-wireless-symbolic")) action.setCheckable(True) - if "default" in line: + action.setText(label) + + item = NetworkItem(self.menu, action) + item.set_password_protected("PSK" in flags) + item.set_signal_level(signal_level) + action.setDefaultWidget(item) + + if "bssid=" + bssid in self.status_lines: action.setChecked(True) + # Only display the "internet" icon when there are wired interfaces + item.set_default_route_interface(wired_interfaces_available and "wlan" in default_route_interface) + + # TODO: Show networks with same SSID if ssid not in ssids_added_to_menu: self.actions.append(action) self.wirelessGroup.addAction(action) @@ -178,21 +413,17 @@ class NetworkMenu(QObject): self.menu.addSeparator() action = QAction("Rescan Networks") - # action.setDisabled(True) action.triggered.connect(self.refreshMenu) self.actions.append(action) self.menu.addAction(action) - action = QAction("Create Hotspot...") # TODO: To be implemented + action = QAction("Create Hotspot...") # TODO: To be implemented action.setDisabled(True) self.actions.append(action) self.menu.addAction(action) - action = QAction("Disconnect") # TODO: To be implemented - if "wpa_state=COMPLETED" in self.status_lines: - action.setDisabled(False) - else: - action.setDisabled(True) + action = QAction("Disconnect") + action.setEnabled("wpa_state=COMPLETED" in self.status_lines) action.triggered.connect(self.disconnect) self.actions.append(action) self.menu.addAction(action) @@ -205,124 +436,120 @@ class NetworkMenu(QObject): self.menu.addAction(action) def reconnect(self): - p = QProcess() - p.setProgram("wpa_cli") - p.setArguments(["reconnect"]) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() - output = str(p.readAllStandardOutput(), 'utf-8') - print(output) + print(run_command_and_get_output("wpa_cli", ["reconnect"])) def disconnect(self): - p = QProcess() - p.setProgram("wpa_cli") - p.setArguments(["disconnect"]) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() - output = str(p.readAllStandardOutput(), 'utf-8') - print(output) - self.tray.setIcon(QIcon.fromTheme("network-wireless-offline-symbolic")) + print(run_command_and_get_output("wpa_cli", ["disconnect"])) + # self.tray.setIcon(QIcon.fromTheme("network-wireless-offline-symbolic")) def switchNetwork(self, line): + # TODO: Support networks protected with EAP + self.check_network_password = False self.updateStatus() ssid = getattr(self.wirelessGroup.checkedAction(), "ssid") bssid = getattr(self.wirelessGroup.checkedAction(), "bssid") flags = getattr(self.wirelessGroup.checkedAction(), "flags") - if "PSK" in flags: - password, ok = QInputDialog.getText(None, "Wireless network password", "Please enter\ - the password for the %s network:" % ssid if ssid else bssid, - QLineEdit.Password) # TODO: Make OK only clickable when we have >= 8 characters - if not ok: - print("User did not click OK in password dialog") - return # Don't try to connect to a network if it has been cancelled. - - self.reconnect() - - # Get a byte string with wpa_cli's output and decode it + # self.reconnect() p = QProcess() - p.setProgram("wpa_cli") - # First, scan - p.setArguments(["list_networks"]) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() - output = str(p.readAllStandardOutput(), 'utf-8') - l = output.strip().splitlines() # Split that output into lines, ignoring the - if len(l) < 2: # useless ones. + #default_route_interface = get_default_route_interface() + + #if "wlan" not in default_route_interface: + # gateway = get_gateway_for_interface("wlan0") + # print(run_command_and_get_output("route", ["change", "default", gateway, "-ifp", "wlan0"])) + + # Get a byte string with wpa_cli's output and decode it + # Split that output into lines, ignoring the useless ones + output = run_command_and_get_output("wpa_cli", ["list_networks"], p) + network_list = output.strip().splitlines() + if len(network_list) < 2: # If this is called, something is wrong with wpa_cli. - self.showError("Could not connect to the network", """For some reaso\ -n wpa_cli doesn't seem to be working. + self.showError("Could not connect to the network", """For some reason wpa_cli doesn't seem to be working. Information for debuggers: -""" + o.strip()) +""" + output.strip()) return - isUsed = 0 - for line in l[2:]: # Ignore 'selected interface' message and column labels - parts = line.split() + is_used = False + for line in network_list[2:]: # Ignore 'selected interface' message and column labels + parts = line.split('\t') if parts[1] in [ssid, bssid]: - isUsed = 1 - n = parts[0] + key_mgmt = run_command_and_get_output("wpa_cli", ["get_network", parts[0], "key_mgmt"], + p).strip().splitlines()[1] - if isUsed: # TODO: We know we've already connected -- check why we are re-connecting + # Accept if the same authentication method is used, treat as different network if not. + # Not handling IEEE8021X + if ("WPA" in flags) == ("WPA" in key_mgmt): + is_used = True + self.network_id = parts[0] + break + + if is_used: # TODO: We know we've already connected -- check why we are re-connecting if "PSK" in flags: - p.setArguments(["set_network", str(n), "psk", '"' + password + '"']) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() - if "OK" in str(p.readAllStandardOutput(), 'utf-8'): - return # good - else: - self.showError("Could not connect to the network. Please check the password.") - return # bad - else: - p.setArguments(["add_network"]) - print(p.program() + " " + " ".join(p.arguments())) + self.check_network_password = True + + p.setArguments(["select_network", str(self.network_id)]) + p.start() + p.waitForFinished() + + p.setArguments(["save_config"]) p.start() p.waitForFinished() - e = str(p.readAllStandardOutput(), 'utf-8').strip().splitlines()[-1] - if not e.isnumeric(): - self.showError("Could not connect to the network.", "'%s' is not a number." % e) - print("'%s' is not a number." % e) + else: + output = run_command_and_get_output("wpa_cli", ["add_network"], p).strip().splitlines()[-1] + if not output.isnumeric(): + self.showError("Could not connect to the network.", "'%s' is not a number." % output) + print("'%s' is not a number." % output) return # bad try: - n = int(e) + self.network_id = int(output) except: self.showError("Could not connect to the network.", "\ We don't know why this happened.") print("returned non-zero exit code") - return # bad - p.setArguments(["set_network", str(n), "ssid" if ssid else "bssid", '"' + (ssid if ssid else bssid) + '"']) - print(p.program() + " " + " ".join(p.arguments())) - p.start() - p.waitForFinished() - out = str(p.readAllStandardOutput(), 'utf-8') - if "OK" not in out: + return # bad + + if ssid: + identifier = "ssid" + identifier_str = ssid + else: + identifier = "bssid" + identifier_str = bssid + + output = run_command_and_get_output( + "wpa_cli", + ["set_network", str(self.network_id), identifier, "\"%s\"" % identifier_str], + p) + + if "OK" not in output: self.showError("Could not connect to the network.", - """Information for debuggers: """ + out) + """Information for debuggers: """ + output) return # bad if "PSK" in flags: - p.setArguments(["set_network", str(n), "psk", '"' + password + '"']) + password, ok = request_network_password( + "