Skip to content
92 changes: 57 additions & 35 deletions main.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from bleak import discover
#!/usr/bin/env python3
from bleak import BleakScanner
from asyncio import new_event_loop, set_event_loop, get_event_loop
from time import sleep, time_ns
from time import sleep
from binascii import hexlify
from json import dumps
from sys import argv
from datetime import datetime
import sys
import asyncio

# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
Expand All @@ -16,49 +19,63 @@
recent_beacons = []


def get_best_result(device):
recent_beacons.append({
def get_best_result(device, adv_data):
try:
from time import time_ns
except ImportError:
from datetime import datetime

def time_ns():
now = datetime.now()
return int(now.timestamp() * 1e9)

current_beacon = {
"time": time_ns(),
"device": device
})
"device": device,
"adv_data": adv_data
}
recent_beacons.append(current_beacon)
strongest_beacon = None
i = 0
while i < len(recent_beacons):
if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS):
recent_beacons.pop(i)
continue
if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi):
strongest_beacon = recent_beacons[i]["device"]
if (strongest_beacon == None or strongest_beacon["adv_data"].rssi < recent_beacons[i]["adv_data"].rssi):
strongest_beacon = recent_beacons[i]
i += 1

if (strongest_beacon != None and strongest_beacon.address == device.address):
strongest_beacon = device
if (strongest_beacon != None and strongest_beacon["device"].address == device.address):
strongest_beacon = current_beacon

return strongest_beacon
return strongest_beacon["adv_data"]


# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
for d in devices:
discovered_devices_and_advertisement_data = await BleakScanner.discover(return_adv=True)
for device, adv_data in discovered_devices_and_advertisement_data.values():
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])))
adv_data = get_best_result(device, adv_data)
if adv_data.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in adv_data.manufacturer_data:
data_hex = hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER])))
if data_length == AIRPODS_DATA_LENGTH:
return data_hex
return False


# Same as get_device() but it's standalone method instead of async
def get_data_hex():
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
if sys.version_info < (3, 7):
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
else:
a = asyncio.run(get_device())
return a


Expand All @@ -84,6 +101,8 @@ def get_data():
model = "AirPods1"
elif chr(raw[7]) == 'a':
model = "AirPodsMax"
elif chr(raw[7]) == '4':
model = "AirPodsPro2"
else:
model = "unknown"

Expand Down Expand Up @@ -130,19 +149,22 @@ def is_flipped(raw):
def run():
output_file = argv[-1]

while True:
data = get_data()

if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)

sleep(UPDATE_DURATION)
try:
while True:
data = get_data()

if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)

sleep(UPDATE_DURATION)
except KeyboardInterrupt:
pass


if __name__ == '__main__':
Expand Down