diff --git a/pysteps/blending/clim.py b/pysteps/blending/clim.py index 82656b27..75399966 100644 --- a/pysteps/blending/clim.py +++ b/pysteps/blending/clim.py @@ -121,6 +121,7 @@ def save_skill( # Append skill to the list of the past X daily averages. if ( past_skill is not None + and past_skill.ndim >= 3 and past_skill.shape[2] == n_cascade_levels and past_skill.shape[1] == skill_today["mean_skill"].shape[0] ): @@ -189,6 +190,13 @@ def calc_clim_skill( # past_skill has dimensions date x model x scale_level x .... if past_skill_file.is_file(): past_skill = np.load(past_skill_file) + # Don't use stored file if the shape no longer matches the requested configuration. + if ( + past_skill.ndim < 3 + or past_skill.shape[1] != n_models + or past_skill.shape[2] != n_cascade_levels + ): + past_skill = np.array(None) else: past_skill = np.array(None) # check if there is enough data to compute the climatological skill diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index fc1530f0..d8d371e4 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -92,9 +92,8 @@ class StepsBlendingConfig: Time step of the motion vectors (minutes). Required if vel_pert_method is not None or mask_method is 'incremental'. n_ens_members: int - The number of ensemble members to generate. This number should always be - equal to or larger than the number of NWP ensemble members / number of - NWP models. + The number of ensemble members to generate. Must be equal to or larger + than the number of NWP ensemble members / number of NWP models. n_cascade_levels: int, optional The number of cascade levels to use. Defaults to 6, see issue #385 on GitHub. @@ -102,6 +101,11 @@ class StepsBlendingConfig: Check if NWP models/members should be used individually, or if all of them are blended together per nowcast ensemble member. Standard set to false. + write_skill: bool, optional + If True, update the rolling climatological skill file at each timestep. + Set to False for workers that should not write skill (e.g. duplicate + workers in a process pool that share a skill directory with a designated + writer). Defaults to True. extrapolation_method: str, optional Name of the extrapolation method to use. See the documentation of :py:mod:`pysteps.extrapolation.interface`. @@ -316,6 +320,7 @@ class StepsBlendingConfig: velocity_perturbation_kwargs: dict[str, Any] = field(default_factory=dict) climatology_kwargs: dict[str, Any] = field(default_factory=dict) mask_kwargs: dict[str, Any] = field(default_factory=dict) + write_skill: bool = True measure_time: bool = False callback: Any | None = None return_output: bool = True @@ -1834,11 +1839,17 @@ def __find_nowcast_NWP_combination(self, t): self.__state.velocity_models_timestep = self.__velocity_models[ :, t, :, :, : ].astype(np.float64, copy=False) - # Make sure the number of model members is not larger than or equal to n_ens_members + # Make sure the number of model members is not larger than n_ens_members. + # For process-pool / MPI parallelism, slice precip_models and velocity_models + # per worker (e.g. precip_models[i:i+1]) and use a per-worker skill directory. n_model_members = self.__state.precip_models_cascades_timestep.shape[0] if n_model_members > self.__config.n_ens_members: raise ValueError( - "The number of NWP model members is larger than the given number of ensemble members. n_model_members <= n_ens_members." + f"Number of NWP models ({n_model_members}) exceeds ensemble size ({self.__config.n_ens_members}). " + f"\n\nQuick fix: Increase n_ens_members to at least {n_model_members}, or reduce NWP models to {self.__config.n_ens_members}." + f"\n\nFor parallel processing: Slice precip_models and velocity_models per worker " + f"(e.g., precip_models[i:i+n]) and give each worker its own outdir_path_skill sub-directory. " + f"This allows each worker to process one model independently. See documentation for details." ) # Check if NWP models/members should be used individually, or if all of @@ -1854,7 +1865,9 @@ def __find_nowcast_NWP_combination(self, t): n_ens_members_provided = self.__precip_nowcast.shape[0] if n_ens_members_provided > self.__config.n_ens_members: raise ValueError( - "The number of nowcast ensemble members provided is larger than the given number of ensemble members requested. n_ens_members_provided <= n_ens_members." + "The number of nowcast ensemble members provided is larger than the " + "given number of ensemble members requested. Either increase " + "n_ens_members or reduce the number of provided nowcast members." ) n_ens_members_max = self.__config.n_ens_members @@ -2072,12 +2085,13 @@ def __determine_skill_for_current_timestep(self, t): ) # Save this in the climatological skill file - blending.clim.save_skill( - current_skill=self.__params.rho_nwp_models, - validtime=self.__issuetime, - outdir_path=self.__config.outdir_path_skill, - **self.__params.climatology_kwargs, - ) + if self.__config.write_skill: + blending.clim.save_skill( + current_skill=self.__params.rho_nwp_models, + validtime=self.__issuetime, + outdir_path=self.__config.outdir_path_skill, + **self.__params.climatology_kwargs, + ) if t > 0: # Determine the skill of the components for lead time (t0 + t) # First for the extrapolation component. Only calculate it when t > 0. @@ -3348,6 +3362,7 @@ def forecast( precip_nowcast=None, n_cascade_levels=6, blend_nwp_members=False, + write_skill=True, precip_thr=None, norain_thr=0.0, kmperpixel=None, @@ -3385,6 +3400,12 @@ def forecast( Generate a blended nowcast ensemble by using the Short-Term Ensemble Prediction System (STEPS) method. + This is a convenience wrapper around :class:`StepsBlendingNowcaster` that + accepts flat keyword arguments instead of a :class:`StepsBlendingConfig` + object. All configuration parameters are documented in + :class:`StepsBlendingConfig`; only the data-input arguments unique to this + function are described here. + Parameters ---------- precip: array-like @@ -3392,64 +3413,59 @@ def forecast( ordered by timestamp from oldest to newest. The time steps between the inputs are assumed to be regular. precip_models: array-like - Either raw (NWP) model forecast data or decomposed (NWP) model forecast data. - If you supply decomposed data, it needs to be an array of shape - (n_models,timesteps+1) containing, per timestep (t=0 to lead time here) and - per (NWP) model or model ensemble member, a dictionary with a list of cascades - obtained by calling a method implemented in :py:mod:`pysteps.cascade.decomposition`. - If you supply the original (NWP) model forecast data, it needs to be an array of shape - (n_models,timestep+1,m,n) containing precipitation (or other) fields, which will - then be decomposed in this function. + Either raw (NWP) model forecast data or decomposed (NWP) model forecast + data. If you supply decomposed data, it needs to be an array of shape + (n_models,timesteps+1) containing, per timestep (t=0 to lead time here) + and per (NWP) model or model ensemble member, a dictionary with a list of + cascades obtained by calling a method implemented in + :py:mod:`pysteps.cascade.decomposition`. + If you supply the original (NWP) model forecast data, it needs to be an + array of shape (n_models,timestep+1,m,n) containing precipitation (or + other) fields, which will then be decomposed in this function. Depending on your use case it can be advantageous to decompose the model forecasts outside beforehand, as this slightly reduces calculation times. This is possible with :py:func:`pysteps.blending.utils.decompose_NWP`, :py:func:`pysteps.blending.utils.compute_store_nwp_motion`, and - :py:func:`pysteps.blending.utils.load_NWP`. However, if you have a lot of (NWP) model - members (e.g. 1 model member per nowcast member), this can lead to excessive memory - usage. + :py:func:`pysteps.blending.utils.load_NWP`. However, if you have a lot of + (NWP) model members (e.g. 1 model member per nowcast member), this can + lead to excessive memory usage. - To further reduce memory usage, both this array and the ``velocity_models`` array - can be given as float32. They will then be converted to float64 before computations - to minimize loss in precision. + To further reduce memory usage, both this array and the + ``velocity_models`` array can be given as float32. They will then be + converted to float64 before computations to minimise loss in precision. - In case of one (deterministic) model as input, add an extra dimension to make sure - precip_models is four dimensional prior to calling this function. + In case of one (deterministic) model as input, add an extra dimension to + make sure precip_models is four dimensional prior to calling this + function. velocity: array-like - Array of shape (2,m,n) containing the x- and y-components of the advection - field. The velocities are assumed to represent one time step between the - inputs. All values are required to be finite. + Array of shape (2,m,n) containing the x- and y-components of the + advection field. The velocities are assumed to represent one time step + between the inputs. All values are required to be finite. velocity_models: array-like - Array of shape (n_models,timestep,2,m,n) containing the x- and y-components - of the advection field for the (NWP) model field per forecast lead time. - All values are required to be finite. To reduce memory usage, this array can - be given as float32. They will then be converted to float64 before computations - to minimize loss in precision. + Array of shape (n_models,timestep,2,m,n) containing the x- and + y-components of the advection field for the (NWP) model field per + forecast lead time. All values are required to be finite. To reduce + memory usage, this array can be given as float32. timesteps: int or list of floats Number of time steps to forecast or a list of time steps for which the forecasts are computed (relative to the input time step). The elements of the list are required to be in ascending order. timestep: float - Time step of the motion vectors (minutes). Required if vel_pert_method is - not None or mask_method is 'incremental'. + Time step of the motion vectors (minutes). Required if + ``vel_pert_method`` is not ``None`` or ``mask_method`` is + ``'incremental'``. issuetime: datetime - is issued. + Issue time of the forecast. n_ens_members: int - The number of ensemble members to generate. This number should always be - equal to or larger than the number of NWP ensemble members / number of - NWP models. + The number of ensemble members to generate. Must be equal to or larger + than the number of NWP ensemble members / number of NWP models. precip_nowcast: array-like, optional - Optional input with array of shape (n_ens_members,timestep+1,m,n) containing - and external nowcast as input to the blending. If precip_nowcast is provided, - the autoregression step and advection step will be omitted for the - extrapolation cascade of the blending procedure and instead, precip_nowcast - will be used as estimate. Defaults to None (which is the standard STEPS) - method described in :cite:`Imhoff2023`. - Note that nowcasting_method should be set to 'external_nowcast' if - precip_nowcast is not None. - Note that in the current setup, only a deterministic precip_nowcast model can - be provided and only one ensemble member (without noise generation) is - returned. This will change soon. + Optional array of shape (n_ens_members,timestep+1,m,n) containing an + external nowcast. When provided, the autoregression and advection step + are skipped for the extrapolation cascade and this field is used instead. + Requires ``nowcasting_method='external_nowcast'``. Defaults to None. + n_cascade_levels: int, optional The number of cascade levels to use. Defaults to 6, see issue #385 on GitHub. @@ -3648,6 +3664,7 @@ def forecast( measure_time: bool If set to True, measure, print and return the computation time. + Returns ------- out: ndarray @@ -3662,8 +3679,12 @@ def forecast( See also -------- - :py:mod:`pysteps.extrapolation.interface`, :py:mod:`pysteps.cascade.interface`, - :py:mod:`pysteps.noise.interface`, :py:func:`pysteps.noise.utils.compute_noise_stddev_adjs` + :class:`StepsBlendingConfig`, + :class:`StepsBlendingNowcaster`, + :py:mod:`pysteps.extrapolation.interface`, + :py:mod:`pysteps.cascade.interface`, + :py:mod:`pysteps.noise.interface`, + :py:func:`pysteps.noise.utils.compute_noise_stddev_adjs` References ---------- @@ -3702,6 +3723,7 @@ def forecast( n_ens_members=n_ens_members, n_cascade_levels=n_cascade_levels, blend_nwp_members=blend_nwp_members, + write_skill=write_skill, precip_threshold=precip_thr, norain_threshold=norain_thr, kmperpixel=kmperpixel, diff --git a/pysteps/tests/test_blending_steps.py b/pysteps/tests/test_blending_steps.py index abfa5522..b98aaae5 100644 --- a/pysteps/tests/test_blending_steps.py +++ b/pysteps/tests/test_blending_steps.py @@ -74,7 +74,6 @@ (5, 10, 1, 8,'external_nowcast_ens', "incremental", "obs", False, "spn", True, 5, False, False, 0, False, None, None, None), (1, 10, 5, 8,'external_nowcast_ens', "incremental", "cdf", False, "bps", True, 5, False, False, 0, False, None, None, 5) ] - # fmt:on @@ -148,6 +147,7 @@ def test_steps_blending( vel_pert_method, max_mask_rim, timestep_start_full_nwp_weight, + write_skill=True, ): pytest.importorskip("cv2") @@ -398,6 +398,7 @@ def test_steps_blending( n_ens_members=n_ens_members, n_cascade_levels=n_cascade_levels, blend_nwp_members=blend_nwp_members, + write_skill=write_skill, precip_thr=metadata["threshold"], kmperpixel=1.0, extrap_method="semilagrangian", @@ -554,3 +555,70 @@ def test_steps_blending_partial_zero_radar(ar_order): converter=converter, metadata=metadata, ) + + +@pytest.mark.parametrize( + "write_skill", + [ + True, # designated skill writer (one per NWP model in the pool) + False, # duplicate worker: forecast runs but skill is not written + ], +) +def test_steps_blending_single_process(write_skill): + """ + Simulate one process-pool worker holding a single NWP model slice. + + This mirrors the one-per-proc mode in multiprocessing.py: each worker + receives one model's precip/velocity slice, its own outdir_path_skill + sub-directory, and write_skill=True only for the designated skill writer + (one per NWP model). With 2 NWP models and 2 STEPS members, both + workers are skill writers (one each). The forecast output shape must + be correct regardless of the write_skill flag. + """ + test_steps_blending( + n_models=1, + timesteps=3, + n_ens_members=1, + n_cascade_levels=6, + nowcasting_method="steps", + mask_method=None, + probmatching_method=None, + blend_nwp_members=False, + weights_method="spn", + decomposed_nwp=True, + expected_n_ens_members=1, + zero_radar=False, + zero_nwp=False, + smooth_radar_mask_range=0, + resample_distribution=False, + vel_pert_method=None, + max_mask_rim=None, + timestep_start_full_nwp_weight=None, + write_skill=write_skill, + ) + + +def test_raises_when_n_models_exceeds_n_ens_members(): + """n_model_members > n_ens_members must raise ValueError (user forgot to slice).""" + pytest.importorskip("cv2") + with pytest.raises(ValueError, match="[Ss]lice"): + test_steps_blending( + n_models=3, + timesteps=3, + n_ens_members=1, + n_cascade_levels=6, + nowcasting_method="steps", + mask_method=None, + probmatching_method=None, + blend_nwp_members=False, + weights_method="spn", + decomposed_nwp=True, + expected_n_ens_members=1, + zero_radar=False, + zero_nwp=False, + smooth_radar_mask_range=0, + resample_distribution=False, + vel_pert_method=None, + max_mask_rim=None, + timestep_start_full_nwp_weight=None, + )