1+ '''Module to to test the bias adjustment methods'''
2+ import sys
3+ import logging
4+ from typing import List
15import numpy as np
26import xarray as xr
3- import random
4-
5- import logging
6-
7- import sys
8- sys .path .append ('../../' )
9- from cmethods .CMethods import CMethods
10-
11- if True : # local space
12- import cmethods
13- print (cmethods .__file__ )
14-
15- formatter = logging .Formatter (
16- fmt = '%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s' ,
17- datefmt = '%Y-%m-%d %H:%M:%S' # %I:%M:%S %p AM|PM format
18- )
19-
20- logging .getLogger ().setLevel (logging .INFO )
21- screen_handler = logging .StreamHandler (stream = sys .stdout )
22- screen_handler .setFormatter (formatter )
23- logging .getLogger ().addHandler (screen_handler )
24- logging .getLogger ().setLevel (logging .INFO )
25- logging .getLogger ('requests' ).setLevel (logging .WARNING )
26- logging .getLogger ('urllib3' ).setLevel (logging .WARNING )
27-
28- def main () -> None :
29- np .random .seed (0 )
30- random .seed (0 )
31-
32- logging .info ('Prepare data sets.' )
7+ from sklearn .metrics import mean_squared_error
8+
9+ try :
10+ from cmethods .CMethods import CMethods
11+ except ModuleNotFoundError :
12+ print ('Using local module' )
13+ sys .path .append ('/Users/benjamin/repositories/awi-workspace/Bias-Adjustment-Python' )
14+ from cmethods .CMethods import CMethods
15+
16+ logging .basicConfig (
17+ format = '%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s' ,
18+ datefmt = '%Y/%m/%d %H:%M:%S' ,
19+ level = logging .INFO
20+ )
21+
22+ def get_datasets ():
3323 historical_time = xr .cftime_range ('1971-01-01' , '2000-12-31' , freq = 'D' , calendar = 'noleap' )
3424 future_time = xr .cftime_range ('2001-01-01' , '2030-12-31' , freq = 'D' , calendar = 'noleap' )
3525
36- get_hist_temp_for_lat = lambda val : 273.15 - (val * np .cos (2 * np .pi * historical_time .dayofyear / 365 ) + 2 * np .random .random_sample ((historical_time .size ,)) + 273.15 + .1 * (historical_time - historical_time [0 ]).days / 365 )
37- get_rand = lambda : np .random .rand () if np .random .rand () > .5 else - np .random .rand ()
26+ def get_hist_temp_for_lat (lat : int ) -> List [float ]:
27+ '''Returns a fake time seires by latitude value'''
28+ return 273.15 - (
29+ lat * np .cos (
30+ 2 * np .pi * historical_time .dayofyear / 365
31+ ) + 2 * np .random .random_sample (
32+ (historical_time .size ,)
33+ ) + 273.15 + .1 * (
34+ historical_time - historical_time [0 ]
35+ ).days / 365
36+ )
37+
3838 latitudes = np .arange (23 ,27 ,1 )
3939 some_data = [get_hist_temp_for_lat (val ) for val in latitudes ]
4040 data = np .array ([some_data , np .array (some_data )+ 1 ])
4141
42- def get_dataset (data ) -> xr .Dataset :
42+ def get_dataset (data , time ):
43+ '''Returns a data set by data and time'''
4344 return xr .DataArray (
4445 data ,
4546 dims = ('lon' , 'lat' , 'time' ),
46- coords = {'time' : historical_time , 'lat' : latitudes , 'lon' : [0 ,1 ]},
47+ coords = {'time' : time , 'lat' : latitudes , 'lon' : [0 ,1 ]},
4748 attrs = {'units' : '°C' },
4849 ).transpose ('time' ,'lat' ,'lon' ).to_dataset (name = 'tas' )
4950
5051
51- obsh = get_dataset (data )
52- simh = get_dataset (data - 2 )
53- simp = get_dataset (data - 1 )
52+ obsh = get_dataset (data , historical_time )
53+ obsp = get_dataset (data + 1 , historical_time )
54+ simh = get_dataset (data - 2 , historical_time )
55+ simp = get_dataset (data - 1 , future_time )
56+ return obsh , obsp , simh , simp
5457
55- cm = CMethods ()
5658
59+ def test_linear_scaling () -> None :
60+ '''Tests the linear scaling method'''
61+ obsh , obsp , simh , simp = get_datasets ()
5762 logging .info ('Testing 1d-methods ...' )
58- assert type ( cm .linear_scaling (
63+ ls_result = CMethods () .linear_scaling (
5964 obs = obsh ['tas' ][:,0 ,0 ],
6065 simh = simh ['tas' ][:,0 ,0 ],
6166 simp = simp ['tas' ][:,0 ,0 ],
6267 kind = '+'
63- )) == xr .DataArray , 'Invalid return type!'
68+ )
69+ assert isinstance (ls_result , xr .core .dataarray .DataArray )
70+ assert mean_squared_error (ls_result , obsp ['tas' ][:,0 ,0 ], squared = False ) < mean_squared_error (simp ['tas' ][:,0 ,0 ], obsp ['tas' ][:,0 ,0 ], squared = False )
6471 logging .info ('Linear Scaling done!' )
65- assert type (cm .variance_scaling (
72+
73+
74+ def test_variance_scaling () -> None :
75+ '''Tests the variance scaling method'''
76+ obsh , obsp , simh , simp = get_datasets ()
77+ vs_result = CMethods ().variance_scaling (
6678 obs = obsh ['tas' ][:,0 ,0 ],
6779 simh = simh ['tas' ][:,0 ,0 ],
6880 simp = simp ['tas' ][:,0 ,0 ],
6981 kind = '+'
70- )) == xr .DataArray , 'Invalid return type!'
82+ )
83+ assert isinstance (vs_result , xr .core .dataarray .DataArray )
84+ assert mean_squared_error (
85+ vs_result , obsp ['tas' ][:,0 ,0 ], squared = False
86+ ) < mean_squared_error (
87+ simp ['tas' ][:,0 ,0 ], obsp ['tas' ][:,0 ,0 ], squared = False
88+ )
7189 logging .info ('Variance Scaling done!' )
7290
73- assert type (cm .delta_method (
91+ def test_delta_method () -> None :
92+ '''Tests the delta method'''
93+ obsh , obsp , simh , simp = get_datasets ()
94+ dm_result = CMethods ().delta_method (
7495 obs = obsh ['tas' ][:,0 ,0 ],
7596 simh = simh ['tas' ][:,0 ,0 ],
7697 simp = simp ['tas' ][:,0 ,0 ],
7798 kind = '+'
78- )) == xr .DataArray , 'Invalid return type!'
99+ )
100+ assert isinstance (dm_result , xr .core .dataarray .DataArray )
101+ assert mean_squared_error (
102+ dm_result , obsp ['tas' ][:,0 ,0 ], squared = False
103+ ) < mean_squared_error (
104+ simp ['tas' ][:,0 ,0 ], obsp ['tas' ][:,0 ,0 ], squared = False
105+ )
79106 logging .info ('Delta Method done!' )
80107
81- assert type (cm .quantile_mapping (
108+ def test_quantile_mapping () -> None :
109+ '''Tests the quantile mapping method'''
110+ obsh , obsp , simh , simp = get_datasets ()
111+ qm_result = CMethods ().quantile_mapping (
82112 obs = obsh ['tas' ][:,0 ,0 ],
83113 simh = simh ['tas' ][:,0 ,0 ],
84114 simp = simp ['tas' ][:,0 ,0 ],
85115 n_quantiles = 100 ,
86116 kind = '+'
87- )) == xr .DataArray , 'Invalid return type!'
88-
89- assert type (cm .quantile_delta_mapping (
117+ )
118+ assert isinstance (qm_result , xr .core .dataarray .DataArray )
119+ assert mean_squared_error (
120+ qm_result , obsp ['tas' ][:,0 ,0 ], squared = False
121+ ) < mean_squared_error (
122+ simp ['tas' ][:,0 ,0 ], obsp ['tas' ][:,0 ,0 ], squared = False
123+ )
124+ logging .info ('Quantile Mapping done!' )
125+
126+ def test_quantile_delta_mapping () -> None :
127+ '''Tests the quantile delta mapping method'''
128+ obsh , obsp , simh , simp = get_datasets ()
129+ qdm_result = CMethods ().quantile_delta_mapping (
90130 obs = obsh ['tas' ][:,0 ,0 ],
91131 simh = simh ['tas' ][:,0 ,0 ],
92132 simp = simp ['tas' ][:,0 ,0 ],
93- n_quantiles = 100 ,
94- kind = '+'
95- )) == xr .DataArray , 'Invalid return type!'
96- logging .info ('Quantile Delta Mapping done!' )
133+ n_quantiles = 100 ,
134+ kind = '+'
135+ )
97136
137+ assert isinstance (qdm_result , xr .core .dataarray .DataArray )
138+ assert mean_squared_error (
139+ qdm_result , obsp ['tas' ][:,0 ,0 ], squared = False
140+ ) < mean_squared_error (
141+ simp ['tas' ][:,0 ,0 ], obsp ['tas' ][:,0 ,0 ], squared = False
142+ )
143+ logging .info ('Quantile Delta Mapping done!' )
98144
99- logging .info ('Testing 3d-methods' )
100145
101- for method in cm .SCALING_METHODS :
102- logging .info (f'Testing { method } ...' )
103- assert type (cm .adjust_3d (
104- method = method ,
105- obs = obsh ['tas' ],
106- simh = simh ['tas' ],
146+ def test_3d_sclaing_methods () -> None :
147+ '''Tests the scaling based methods for 3-dimentsional data sets'''
148+ obsh , obsp , simh , simp = get_datasets ()
149+ for method in CMethods ().SCALING_METHODS :
150+ logging .info (f'Testing { method } ...' )
151+ result = CMethods ().adjust_3d (
152+ method = method ,
153+ obs = obsh ['tas' ],
154+ simh = simh ['tas' ],
107155 simp = simp ['tas' ],
108156 kind = '+' ,
109157 goup = 'time.month'
110- )) == xr .DataArray
111- logging .info (f'{ method } - success!' )
112-
113- for method in cm .DISTRIBUTION_METHODS :
114- logging .info (f'Testing { method } ...' )
115- assert type (cm .adjust_3d (
116- method = method ,
117- obs = obsh ['tas' ],
118- simh = simh ['tas' ],
158+ )
159+ assert isinstance (result , xr .core .dataarray .DataArray )
160+ for lat in range (len (obsh .lat )):
161+ for lon in range (len (obsh .lon )):
162+ assert mean_squared_error (
163+ result [:,lat ,lon ], obsp ['tas' ][:,lat ,lon ], squared = False
164+ ) < mean_squared_error (
165+ simp ['tas' ][:,lat ,lon ], obsp ['tas' ][:,lat ,lon ], squared = False
166+ )
167+ logging .info (f'3d { method } - success!' )
168+
169+ def test_3d_distribution_methods () -> None :
170+ '''Tests the distribution based methods for 3-dimentsional data sets'''
171+ obsh , obsp , simh , simp = get_datasets ()
172+ for method in CMethods ().DISTRIBUTION_METHODS :
173+ logging .info (f'Testing { method } ...' )
174+ result = CMethods ().adjust_3d (
175+ method = method ,
176+ obs = obsh ['tas' ],
177+ simh = simh ['tas' ],
119178 simp = simp ['tas' ],
120179 n_quantiles = 100
121- )) == xr .DataArray
122- logging .info (f'{ method } - success!' )
123-
180+ )
181+ assert isinstance (result , xr .core .dataarray .DataArray )
182+ for lat in range (len (obsh .lat )):
183+ for lon in range (len (obsh .lon )):
184+ assert mean_squared_error (
185+ result [:,lat ,lon ], obsp ['tas' ][:,lat ,lon ], squared = False
186+ ) < mean_squared_error (
187+ simp ['tas' ][:,lat ,lon ], obsp ['tas' ][:,lat ,lon ], squared = False
188+ )
189+ logging .info (f'3d { method } - success!' )
190+
191+ def main () -> None :
192+ '''Main'''
124193
194+ test_linear_scaling ()
195+ test_variance_scaling ()
196+ test_delta_method ()
197+ test_quantile_mapping ()
198+ test_quantile_delta_mapping ()
199+ test_3d_sclaing_methods ()
200+ test_3d_distribution_methods ()
125201
126202if __name__ == '__main__' :
127203 main ()
128- logging .info ('Everything passed!' )
204+ logging .info ('Everything passed!' )
0 commit comments