66from datetime import datetime
77from time import sleep
88from pathlib import Path
9+ from typing import Dict , Any , Optional
910#
10- URL = 'https://location.services.mozilla.com/v1/geolocate?key=test'
11- NMCMD = ['nmcli' ,'-g' ,'SSID,BSSID,FREQ,SIGNAL' ,'device' ,'wifi' ] # Debian stretch, Ubuntu 18.04
12- NMLEG = ['nmcli' ,'-t' ,'-f' ,'SSID,BSSID,FREQ,SIGNAL' ,'device' ,'wifi' ] # ubuntu 16.04
13- NMSCAN = ['nmcli' ,'device' ,'wifi' ,'rescan' ]
14- HEADER = 'time lat lon accuracy NumBSSIDs'
11+ URL = 'https://location.services.mozilla.com/v1/geolocate?key=test'
12+ NMCMD = ['nmcli' , '-g' , 'SSID,BSSID,FREQ,SIGNAL' , 'device' , 'wifi' ] # Debian stretch, Ubuntu 18.04
13+ NMLEG = ['nmcli' , '-t' , '-f' , 'SSID,BSSID,FREQ,SIGNAL' , 'device' , 'wifi' ] # ubuntu 16.04
14+ NMSCAN = ['nmcli' , 'device' , 'wifi' , 'rescan' ]
15+ HEADER = 'time lat lon accuracy NumBSSIDs'
1516
1617# %%
17- def logwifiloc (T :float , logfile :Path ):
18+
19+
20+ def logwifiloc (T : float , logfile : Path ):
1821
1922 if logfile :
2023 logfile = Path (logfile ).expanduser ()
2124 with logfile .open ('a' ) as f :
2225 f .write (HEADER + '\n ' )
2326
24-
2527 print (f'updating every { T } seconds' )
2628 print (HEADER )
2729
2830 nm_config_check ()
29- sleep (0.5 ) # nmcli errored for less than about 0.2 sec.
31+ sleep (0.5 ) # nmcli errored for less than about 0.2 sec.
3032 while True :
3133 loc = get_nmcli ()
3234 if loc is None :
@@ -42,34 +44,37 @@ def logwifiloc(T:float, logfile:Path):
4244
4345 sleep (T )
4446
47+
4548def nm_config_check ():
46- # %% check that NetworkManager CLI is available and WiFi is active
49+ # %% check that NetworkManager CLI is available and WiFi is active
4750 try :
48- ret = subprocess .check_output (['nmcli' ,'-t' ,'radio' ,'wifi' ], universal_newlines = True , timeout = 1. ).strip ().split (':' )
51+ ret = subprocess .check_output (['nmcli' , '-t' , 'radio' , 'wifi' ], universal_newlines = True , timeout = 1. ).strip ().split (':' )
4952 except FileNotFoundError :
5053 raise OSError ('CUrrently this program relies on NetworkManager' )
5154
52- assert 'enabled' in ret and not 'disabled' in ret ,'must enable WiFi, perhaps via nmcli radio wifi on'
55+ assert 'enabled' in ret and 'disabled' not in ret , 'must enable WiFi, perhaps via nmcli radio wifi on'
5356
5457# %%
55- def get_nmcli () -> dict :
58+
59+
60+ def get_nmcli () -> Optional [Dict [str , Any ]]:
5661
5762 ret = subprocess .check_output (NMCMD , universal_newlines = True , timeout = 1. )
58- sleep (0.5 ) # nmcli errored for less than about 0.2 sec.
63+ sleep (0.5 ) # nmcli errored for less than about 0.2 sec.
5964 try :
60- subprocess .check_call (NMSCAN , timeout = 1. ) # takes several seconds to update, so do it now.
65+ subprocess .check_call (NMSCAN , timeout = 1. ) # takes several seconds to update, so do it now.
6166 except subprocess .CalledProcessError as e :
6267 logging .error (f'consider slowing scan cadence. { e } ' )
6368
6469 dat = pandas .read_csv (StringIO (ret ), sep = r'(?<!\\):' , index_col = False ,
6570 header = 0 , encoding = 'utf8' , engine = 'python' ,
66- dtype = str , usecols = [0 ,1 , 3 ],
67- names = ['ssid' ,'macAddress' ,'signalStrength' ])
71+ dtype = str , usecols = [0 , 1 , 3 ],
72+ names = ['ssid' , 'macAddress' , 'signalStrength' ])
6873# %% optout
6974 dat = dat [~ dat ['ssid' ].str .endswith ('_nomap' )]
7075# %% cleanup
71- dat ['ssid' ] = dat ['ssid' ].str .replace ('nan' ,'' )
72- dat ['macAddress' ] = dat ['macAddress' ].str .replace (r'\\:' ,':' )
76+ dat ['ssid' ] = dat ['ssid' ].str .replace ('nan' , '' )
77+ dat ['macAddress' ] = dat ['macAddress' ].str .replace (r'\\:' , ':' )
7378# %% JSON
7479 jdat = dat .to_json (orient = 'records' )
7580 jdat = '{ "wifiAccessPoints":' + jdat + '}'
@@ -79,21 +84,22 @@ def get_nmcli() -> dict:
7984 req = requests .post (URL , data = jdat )
8085 if req .status_code != 200 :
8186 logging .error (req .text )
82- return
87+ return None
8388 except requests .exceptions .ConnectionError as e :
8489 logging .error (f'no network connection. { e } ' )
85- return
90+ return None
8691# %% process MLS response
8792 jres = req .json ()
8893 loc = jres ['location' ]
8994 loc ['accuracy' ] = jres ['accuracy' ]
90- loc ['N' ] = dat .shape [0 ] # number of BSSIDs used
95+ loc ['N' ] = dat .shape [0 ] # number of BSSIDs used
9196 loc ['t' ] = datetime .now ()
9297
9398 return loc
9499# %%
95100
96- def csv2kml (csvfn :Path , kmlfn :Path ):
101+
102+ def csv2kml (csvfn : Path , kmlfn : Path ):
97103 from simplekml import Kml
98104
99105 """
@@ -108,7 +114,7 @@ def csv2kml(csvfn:Path, kmlfn:Path):
108114 dat = pandas .read_csv (csvfn , sep = ' ' , index_col = 0 , header = 0 )
109115
110116 t = dat .index .tolist ()
111- lla = dat .loc [:,['lon' ,'lat' ]].values
117+ lla = dat .loc [:, ['lon' , 'lat' ]].values
112118# %% write KML
113119 """
114120 http://simplekml.readthedocs.io/en/latest/geometries.html#gxtrack
@@ -118,7 +124,7 @@ def csv2kml(csvfn:Path, kmlfn:Path):
118124 kml = Kml (name = 'My Kml' )
119125 trk = kml .newgxtrack (name = 'My Track' )
120126 trk .newwhen (t ) # list of times. MUST be format 2010-05-28T02:02:09Z
121- trk .newgxcoord (lla .tolist ()) # list of lon,lat,alt, NOT ndarray!
127+ trk .newgxcoord (lla .tolist ()) # list of lon,lat,alt, NOT ndarray!
122128
123129# just a bunch of points
124130# for i,p in enumerate(lla): # iterate over rows
0 commit comments