diff --git a/examples/read_firmware_version.py b/examples/read_firmware_version.py index 3e5f031..8bb929f 100644 --- a/examples/read_firmware_version.py +++ b/examples/read_firmware_version.py @@ -29,4 +29,5 @@ def characteristic_value_updated(self, characteristic, value): device = AnyDevice(manager=manager, mac_address=args.mac_address) device.connect() + manager.run() diff --git a/gatt/__init__.pyc b/gatt/__init__.pyc new file mode 100644 index 0000000..67f78de Binary files /dev/null and b/gatt/__init__.pyc differ diff --git a/gatt/errors.pyc b/gatt/errors.pyc new file mode 100644 index 0000000..4d37f76 Binary files /dev/null and b/gatt/errors.pyc differ diff --git a/gatt/gatt.pyc b/gatt/gatt.pyc new file mode 100644 index 0000000..0e9dec2 Binary files /dev/null and b/gatt/gatt.pyc differ diff --git a/gatt/gatt_linux.py b/gatt/gatt_linux.py index 5fdb9d7..44b31f5 100644 --- a/gatt/gatt_linux.py +++ b/gatt/gatt_linux.py @@ -63,6 +63,7 @@ def run(self): This call blocks until you call `stop()` to stop the main loop. """ + if self._main_loop: return @@ -86,7 +87,7 @@ def disconnect_signals(): self._properties_changed_signal.remove() self._interface_added_signal.remove() - self._main_loop = GObject.MainLoop() + self._main_loop = GObject.MainLoop.new(None, False) # modification to launch run in a thread try: self._main_loop.run() disconnect_signals() @@ -605,6 +606,7 @@ def write_value(self, value, offset=0): :param value: array of bytes to be written :param offset: offset from where to start writing the bytes (defaults to 0) """ + print("Sent : ", value) bytes = [dbus.Byte(b) for b in value] try: @@ -640,6 +642,7 @@ def enable_notifications(self, enabled=True): Each time when the device notifies a new value, `characteristic_value_updated()` of the related device will be called. """ + print("enable_notifications called") try: if enabled: self._object.StartNotify( diff --git a/gatt/gatt_linux.pyc b/gatt/gatt_linux.pyc new file mode 100644 index 0000000..56c1faf Binary files /dev/null and b/gatt/gatt_linux.pyc differ diff --git a/gattctl.py b/gattctl.py index 9f57947..2acf672 100755 --- a/gattctl.py +++ b/gattctl.py @@ -1,7 +1,12 @@ #!/usr/bin/env python3 from argparse import ArgumentParser +from threading import Thread +import keyboard +from termios import tcflush, TCIFLUSH import gatt +import time +import sys device_manager = None @@ -11,13 +16,28 @@ class AnyDeviceManager(gatt.DeviceManager): An implementation of ``gatt.DeviceManager`` that discovers any GATT device and prints all discovered devices. """ + def __init__(self, adapter_name): + super().__init__(adapter_name) + self.nb_device_connected = 0 + self.discover_filter = 'True' def device_discovered(self, device): - print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias())) + if self.discover_filter == 'True': + print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias())) + elif self.discover_filter in device.alias() or self.discover_filter in device.mac_address: + print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias())) def make_device(self, mac_address): return AnyDevice(mac_address=mac_address, manager=self) + def run(self): + print("Running Manager") + super().run() + + def quit(self): + print("Stopping Manager") + super().stop() + class AnyDevice(gatt.Device): """ @@ -28,6 +48,9 @@ class AnyDevice(gatt.Device): def __init__(self, mac_address, manager, auto_reconnect=False): super().__init__(mac_address=mac_address, manager=manager) self.auto_reconnect = auto_reconnect + self.message = '' + self.transmition = "" + self.reception = "" def connect(self): print("Connecting...") @@ -35,6 +58,7 @@ def connect(self): def connect_succeeded(self): super().connect_succeeded() + device_manager.nb_device_connected += 1 print("[%s] Connected" % (self.mac_address)) def connect_failed(self, error): @@ -42,6 +66,8 @@ def connect_failed(self, error): print("[%s] Connection failed: %s" % (self.mac_address, str(error))) def disconnect_succeeded(self): + if device_manager.nb_device_connected: + device_manager.nb_device_connected -= 1 super().disconnect_succeeded() print("[%s] Disconnected" % (self.mac_address)) @@ -55,7 +81,48 @@ def services_resolved(self): for service in self.services: print("[%s] Service [%s]" % (self.mac_address, service.uuid)) for characteristic in service.characteristics: + if characteristic.uuid == '6e400003-b5a3-f393-e0a9-e50e24dcca9e': + characteristic.enable_notifications() + print("[%s] Characteristic [%s]" % (self.mac_address, characteristic.uuid)) + print("[%s] Resolved services" % (self.mac_address)) + + device_service = next( + s for s in self.services + if s.uuid == '6e400001-b5a3-f393-e0a9-e50e24dcca9e') + + self.reception = next( + c for c in device_service.characteristics + if c.uuid == '6e400003-b5a3-f393-e0a9-e50e24dcca9e') + + self.transmition = next( + c for c in device_service.characteristics + if c.uuid == '6e400002-b5a3-f393-e0a9-e50e24dcca9e') + + if self.message: + bar = bytearray(self.message.encode()) + self.transmition.write_value(bar) + + time.sleep(2) + self.reception.read_value() + + def characteristic_value_updated(self, characteristic, value): + print("RX:", (value.decode("utf-8")).replace('\n', '')) + + def characteristic_read_value_failed(self, characteristic, error): + print("RX fail : ", error) + + def characteristic_enable_notifications_succeeded(self, characteristic): + print("enable notification succeeded for characteristic ", characteristic.uuid) + + def characteristic_enable_notifications_failed(self, characteristic, error): + print("enable notification succeeded for characteristic ", characteristic.uuid, ", error, ", error) + + def characteristic_write_value_succeeded(self, characteristic): + print("write value succeeded for characteristic ", characteristic.uuid) + + def characteristic_write_value_failed(self, characteristic, error): + print("write value fail for characteristic ", characteristic.uuid, ", error, ", error) def main(): @@ -64,6 +131,11 @@ def main(): '--adapter', default='hci0', help="Name of Bluetooth adapter, defaults to 'hci0'") + arg_parser.add_argument( + '--message', + metavar='string', + type=str, + help="String send by ble, usable with --connect only. All 'n' of the message will be remplace by '\ n' ") arg_commands_group = arg_parser.add_mutually_exclusive_group(required=True) arg_commands_group.add_argument( '--power-on', @@ -81,6 +153,11 @@ def main(): '--discover', action='store_true', help="Lists all nearby GATT devices") + arg_commands_group.add_argument( + '--discover-filter', + metavar='filter', + type=str, + help="Lists all nearby GATT devices containing filter in their MAC address or alias") arg_commands_group.add_argument( '--connect', metavar='address', @@ -114,8 +191,14 @@ def main(): return if args.discover: device_manager.start_discovery() + if args.discover_filter: + print("discover filter : ", args.discover_filter) + device_manager.discover_filter = args.discover_filter + device_manager.start_discovery() elif args.connect: device = AnyDevice(mac_address=args.connect, manager=device_manager) + if args.message: + device.message = (args.message.replace('%n','\n')) device.connect() elif args.auto: device = AnyDevice(mac_address=args.auto, manager=device_manager, auto_reconnect=True) @@ -125,12 +208,28 @@ def main(): device.disconnect() return + thread = Thread(target=device_manager.run) + thread.start() + print("Terminate with Ctrl+C") + print("Started Thread.") + try: - device_manager.run() + while True: + if device_manager.nb_device_connected: + # If ESCAPE key is pressed you can start to write the message, ENTER to send it to the device + if keyboard.is_pressed(chr(27)): + tcflush(sys.stdin, TCIFLUSH) + message = bytearray(input(">> ").replace('%n','\n').encode()) + device.transmition.write_value(message) + + pass except KeyboardInterrupt: - pass - + print('KeyboardInterrupt!') + if device_manager.nb_device_connected: + device.disconnect() + device_manager.quit() + thread.join() if __name__ == '__main__': main()