Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
92744dc
In StepsBlendingNowcaster, add an option to compute a single blended …
ladc May 26, 2026
f193b9a
Remove duplicate documentation in steps blending.
ladc May 26, 2026
5dcabb2
Add tests pysteps blending single member mode (used for parallellizin…
ladc May 27, 2026
6238329
Refactor StepsBlendingConfig to replace single_member_mode with model…
ladc May 27, 2026
4958692
Improve vague error message in steps blending; adapt tests accordingly.
ladc May 27, 2026
695af3e
Add complevel argument to netcdf exporter + tests (#550)
ladc May 27, 2026
cf0d129
Check past skill dimensions in save_skill and calc_clim_skill.
ladc May 27, 2026
cb27f2b
Add write_skill flag and avoid error-prone dependence on model_index_…
ladc May 27, 2026
657d274
Merge branch 'master' into allow_single_member_blending
ladc May 27, 2026
026b231
Correctly compute n_model also when blending multiple NWP members.
ladc May 27, 2026
6dd888d
Add steps blending test with nonzero model_index_offset
ladc May 27, 2026
af454a0
Remove out-of-sync list of blended forecast arguments from documentat…
ladc May 27, 2026
83664b4
Remove the model_index_offset which has become superfluous now that w…
ladc May 27, 2026
85047ad
Restore requirements in documentation of steps blending forecast method.
ladc May 27, 2026
3e69692
Restore old formatting of tests after terrible accident due to remove…
ladc May 27, 2026
ae6d405
Silence codacity with correct position of doc summary (on the second …
ladc May 27, 2026
f52c8a8
docs: restore forecast parameter doc formatting
Copilot May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pysteps/blending/clim.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def save_skill(
# Append skill to the list of the past X daily averages.
if (
past_skill is not None
and past_skill.ndim >= 3
and past_skill.shape[2] == n_cascade_levels
and past_skill.shape[1] == skill_today["mean_skill"].shape[0]
):
Expand Down Expand Up @@ -189,6 +190,13 @@ def calc_clim_skill(
# past_skill has dimensions date x model x scale_level x ....
if past_skill_file.is_file():
past_skill = np.load(past_skill_file)
# Don't use stored file if the shape no longer matches the requested configuration.
if (
past_skill.ndim < 3
or past_skill.shape[1] != n_models
or past_skill.shape[2] != n_cascade_levels
):
past_skill = np.array(None)
else:
past_skill = np.array(None)
# check if there is enough data to compute the climatological skill
Expand Down
132 changes: 77 additions & 55 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,20 @@ class StepsBlendingConfig:
Time step of the motion vectors (minutes). Required if vel_pert_method is
not None or mask_method is 'incremental'.
n_ens_members: int
The number of ensemble members to generate. This number should always be
equal to or larger than the number of NWP ensemble members / number of
NWP models.
The number of ensemble members to generate. Must be equal to or larger
than the number of NWP ensemble members / number of NWP models.
n_cascade_levels: int, optional
The number of cascade levels to use. Defaults to 6,
see issue #385 on GitHub.
blend_nwp_members: bool
Check if NWP models/members should be used individually, or if all of
them are blended together per nowcast ensemble member. Standard set to
false.
write_skill: bool, optional
If True, update the rolling climatological skill file at each timestep.
Set to False for workers that should not write skill (e.g. duplicate
workers in a process pool that share a skill directory with a designated
writer). Defaults to True.
extrapolation_method: str, optional
Name of the extrapolation method to use. See the documentation of
:py:mod:`pysteps.extrapolation.interface`.
Expand Down Expand Up @@ -316,6 +320,7 @@ class StepsBlendingConfig:
velocity_perturbation_kwargs: dict[str, Any] = field(default_factory=dict)
climatology_kwargs: dict[str, Any] = field(default_factory=dict)
mask_kwargs: dict[str, Any] = field(default_factory=dict)
write_skill: bool = True
measure_time: bool = False
callback: Any | None = None
return_output: bool = True
Expand Down Expand Up @@ -1834,11 +1839,17 @@ def __find_nowcast_NWP_combination(self, t):
self.__state.velocity_models_timestep = self.__velocity_models[
:, t, :, :, :
].astype(np.float64, copy=False)
# Make sure the number of model members is not larger than or equal to n_ens_members
# Make sure the number of model members is not larger than n_ens_members.
# For process-pool / MPI parallelism, slice precip_models and velocity_models
# per worker (e.g. precip_models[i:i+1]) and use a per-worker skill directory.
n_model_members = self.__state.precip_models_cascades_timestep.shape[0]
if n_model_members > self.__config.n_ens_members:
raise ValueError(
"The number of NWP model members is larger than the given number of ensemble members. n_model_members <= n_ens_members."
f"Number of NWP models ({n_model_members}) exceeds ensemble size ({self.__config.n_ens_members}). "
f"\n\nQuick fix: Increase n_ens_members to at least {n_model_members}, or reduce NWP models to {self.__config.n_ens_members}."
f"\n\nFor parallel processing: Slice precip_models and velocity_models per worker "
f"(e.g., precip_models[i:i+n]) and give each worker its own outdir_path_skill sub-directory. "
f"This allows each worker to process one model independently. See documentation for details."
)

# Check if NWP models/members should be used individually, or if all of
Expand All @@ -1854,7 +1865,9 @@ def __find_nowcast_NWP_combination(self, t):
n_ens_members_provided = self.__precip_nowcast.shape[0]
if n_ens_members_provided > self.__config.n_ens_members:
raise ValueError(
"The number of nowcast ensemble members provided is larger than the given number of ensemble members requested. n_ens_members_provided <= n_ens_members."
"The number of nowcast ensemble members provided is larger than the "
"given number of ensemble members requested. Either increase "
"n_ens_members or reduce the number of provided nowcast members."
)

n_ens_members_max = self.__config.n_ens_members
Expand Down Expand Up @@ -2072,12 +2085,13 @@ def __determine_skill_for_current_timestep(self, t):
)

# Save this in the climatological skill file
blending.clim.save_skill(
current_skill=self.__params.rho_nwp_models,
validtime=self.__issuetime,
outdir_path=self.__config.outdir_path_skill,
**self.__params.climatology_kwargs,
)
if self.__config.write_skill:
blending.clim.save_skill(
current_skill=self.__params.rho_nwp_models,
validtime=self.__issuetime,
outdir_path=self.__config.outdir_path_skill,
**self.__params.climatology_kwargs,
)
if t > 0:
# Determine the skill of the components for lead time (t0 + t)
# First for the extrapolation component. Only calculate it when t > 0.
Expand Down Expand Up @@ -3348,6 +3362,7 @@ def forecast(
precip_nowcast=None,
n_cascade_levels=6,
blend_nwp_members=False,
write_skill=True,
Comment thread
ladc marked this conversation as resolved.
precip_thr=None,
norain_thr=0.0,
kmperpixel=None,
Expand Down Expand Up @@ -3385,71 +3400,72 @@ def forecast(
Generate a blended nowcast ensemble by using the Short-Term Ensemble
Prediction System (STEPS) method.

This is a convenience wrapper around :class:`StepsBlendingNowcaster` that
accepts flat keyword arguments instead of a :class:`StepsBlendingConfig`
object. All configuration parameters are documented in
:class:`StepsBlendingConfig`; only the data-input arguments unique to this
function are described here.

Parameters
----------
precip: array-like
Array of shape (ar_order+1,m,n) containing the input precipitation fields
ordered by timestamp from oldest to newest. The time steps between the
inputs are assumed to be regular.
precip_models: array-like
Either raw (NWP) model forecast data or decomposed (NWP) model forecast data.
If you supply decomposed data, it needs to be an array of shape
(n_models,timesteps+1) containing, per timestep (t=0 to lead time here) and
per (NWP) model or model ensemble member, a dictionary with a list of cascades
obtained by calling a method implemented in :py:mod:`pysteps.cascade.decomposition`.
If you supply the original (NWP) model forecast data, it needs to be an array of shape
(n_models,timestep+1,m,n) containing precipitation (or other) fields, which will
then be decomposed in this function.
Either raw (NWP) model forecast data or decomposed (NWP) model forecast
data. If you supply decomposed data, it needs to be an array of shape
(n_models,timesteps+1) containing, per timestep (t=0 to lead time here)
and per (NWP) model or model ensemble member, a dictionary with a list of
cascades obtained by calling a method implemented in
:py:mod:`pysteps.cascade.decomposition`.
If you supply the original (NWP) model forecast data, it needs to be an
array of shape (n_models,timestep+1,m,n) containing precipitation (or
other) fields, which will then be decomposed in this function.

Depending on your use case it can be advantageous to decompose the model
forecasts outside beforehand, as this slightly reduces calculation times.
This is possible with :py:func:`pysteps.blending.utils.decompose_NWP`,
:py:func:`pysteps.blending.utils.compute_store_nwp_motion`, and
:py:func:`pysteps.blending.utils.load_NWP`. However, if you have a lot of (NWP) model
members (e.g. 1 model member per nowcast member), this can lead to excessive memory
usage.
:py:func:`pysteps.blending.utils.load_NWP`. However, if you have a lot of
(NWP) model members (e.g. 1 model member per nowcast member), this can
lead to excessive memory usage.

To further reduce memory usage, both this array and the ``velocity_models`` array
can be given as float32. They will then be converted to float64 before computations
to minimize loss in precision.
To further reduce memory usage, both this array and the
``velocity_models`` array can be given as float32. They will then be
converted to float64 before computations to minimise loss in precision.

In case of one (deterministic) model as input, add an extra dimension to make sure
precip_models is four dimensional prior to calling this function.
In case of one (deterministic) model as input, add an extra dimension to
make sure precip_models is four dimensional prior to calling this
function.
velocity: array-like
Array of shape (2,m,n) containing the x- and y-components of the advection
field. The velocities are assumed to represent one time step between the
inputs. All values are required to be finite.
Array of shape (2,m,n) containing the x- and y-components of the
advection field. The velocities are assumed to represent one time step
between the inputs. All values are required to be finite.
velocity_models: array-like
Array of shape (n_models,timestep,2,m,n) containing the x- and y-components
of the advection field for the (NWP) model field per forecast lead time.
All values are required to be finite. To reduce memory usage, this array can
be given as float32. They will then be converted to float64 before computations
to minimize loss in precision.
Array of shape (n_models,timestep,2,m,n) containing the x- and
y-components of the advection field for the (NWP) model field per
forecast lead time. All values are required to be finite. To reduce
memory usage, this array can be given as float32.
timesteps: int or list of floats
Number of time steps to forecast or a list of time steps for which the
forecasts are computed (relative to the input time step). The elements of
the list are required to be in ascending order.
timestep: float
Time step of the motion vectors (minutes). Required if vel_pert_method is
not None or mask_method is 'incremental'.
Time step of the motion vectors (minutes). Required if
``vel_pert_method`` is not ``None`` or ``mask_method`` is
``'incremental'``.
issuetime: datetime
is issued.
Issue time of the forecast.
n_ens_members: int
The number of ensemble members to generate. This number should always be
equal to or larger than the number of NWP ensemble members / number of
NWP models.
The number of ensemble members to generate. Must be equal to or larger
than the number of NWP ensemble members / number of NWP models.
precip_nowcast: array-like, optional
Optional input with array of shape (n_ens_members,timestep+1,m,n) containing
and external nowcast as input to the blending. If precip_nowcast is provided,
the autoregression step and advection step will be omitted for the
extrapolation cascade of the blending procedure and instead, precip_nowcast
will be used as estimate. Defaults to None (which is the standard STEPS)
method described in :cite:`Imhoff2023`.
Note that nowcasting_method should be set to 'external_nowcast' if
precip_nowcast is not None.
Note that in the current setup, only a deterministic precip_nowcast model can
be provided and only one ensemble member (without noise generation) is
returned. This will change soon.
Optional array of shape (n_ens_members,timestep+1,m,n) containing an
external nowcast. When provided, the autoregression and advection step
are skipped for the extrapolation cascade and this field is used instead.
Requires ``nowcasting_method='external_nowcast'``. Defaults to None.

n_cascade_levels: int, optional
The number of cascade levels to use. Defaults to 6,
see issue #385 on GitHub.
Expand Down Expand Up @@ -3648,6 +3664,7 @@ def forecast(
measure_time: bool
If set to True, measure, print and return the computation time.


Returns
-------
out: ndarray
Expand All @@ -3662,8 +3679,12 @@ def forecast(

See also
--------
:py:mod:`pysteps.extrapolation.interface`, :py:mod:`pysteps.cascade.interface`,
:py:mod:`pysteps.noise.interface`, :py:func:`pysteps.noise.utils.compute_noise_stddev_adjs`
:class:`StepsBlendingConfig`,
:class:`StepsBlendingNowcaster`,
:py:mod:`pysteps.extrapolation.interface`,
:py:mod:`pysteps.cascade.interface`,
:py:mod:`pysteps.noise.interface`,
:py:func:`pysteps.noise.utils.compute_noise_stddev_adjs`

References
----------
Expand Down Expand Up @@ -3702,6 +3723,7 @@ def forecast(
n_ens_members=n_ens_members,
n_cascade_levels=n_cascade_levels,
blend_nwp_members=blend_nwp_members,
write_skill=write_skill,
precip_threshold=precip_thr,
norain_threshold=norain_thr,
kmperpixel=kmperpixel,
Expand Down
70 changes: 69 additions & 1 deletion pysteps/tests/test_blending_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
(5, 10, 1, 8,'external_nowcast_ens', "incremental", "obs", False, "spn", True, 5, False, False, 0, False, None, None, None),
(1, 10, 5, 8,'external_nowcast_ens', "incremental", "cdf", False, "bps", True, 5, False, False, 0, False, None, None, 5)
]

# fmt:on


Expand Down Expand Up @@ -148,6 +147,7 @@ def test_steps_blending(
vel_pert_method,
max_mask_rim,
timestep_start_full_nwp_weight,
write_skill=True,
):
pytest.importorskip("cv2")

Expand Down Expand Up @@ -398,6 +398,7 @@ def test_steps_blending(
n_ens_members=n_ens_members,
n_cascade_levels=n_cascade_levels,
blend_nwp_members=blend_nwp_members,
write_skill=write_skill,
precip_thr=metadata["threshold"],
kmperpixel=1.0,
extrap_method="semilagrangian",
Expand Down Expand Up @@ -554,3 +555,70 @@ def test_steps_blending_partial_zero_radar(ar_order):
converter=converter,
metadata=metadata,
)


@pytest.mark.parametrize(
"write_skill",
[
True, # designated skill writer (one per NWP model in the pool)
False, # duplicate worker: forecast runs but skill is not written
],
)
def test_steps_blending_single_process(write_skill):
"""
Simulate one process-pool worker holding a single NWP model slice.

This mirrors the one-per-proc mode in multiprocessing.py: each worker
receives one model's precip/velocity slice, its own outdir_path_skill
sub-directory, and write_skill=True only for the designated skill writer
(one per NWP model). With 2 NWP models and 2 STEPS members, both
workers are skill writers (one each). The forecast output shape must
be correct regardless of the write_skill flag.
"""
test_steps_blending(
n_models=1,
timesteps=3,
n_ens_members=1,
n_cascade_levels=6,
nowcasting_method="steps",
mask_method=None,
probmatching_method=None,
blend_nwp_members=False,
weights_method="spn",
decomposed_nwp=True,
expected_n_ens_members=1,
zero_radar=False,
zero_nwp=False,
smooth_radar_mask_range=0,
resample_distribution=False,
vel_pert_method=None,
max_mask_rim=None,
timestep_start_full_nwp_weight=None,
write_skill=write_skill,
)
Comment thread
ladc marked this conversation as resolved.


def test_raises_when_n_models_exceeds_n_ens_members():
"""n_model_members > n_ens_members must raise ValueError (user forgot to slice)."""
pytest.importorskip("cv2")
with pytest.raises(ValueError, match="[Ss]lice"):
test_steps_blending(
n_models=3,
timesteps=3,
n_ens_members=1,
n_cascade_levels=6,
nowcasting_method="steps",
mask_method=None,
probmatching_method=None,
blend_nwp_members=False,
weights_method="spn",
decomposed_nwp=True,
expected_n_ens_members=1,
zero_radar=False,
zero_nwp=False,
smooth_radar_mask_range=0,
resample_distribution=False,
vel_pert_method=None,
max_mask_rim=None,
timestep_start_full_nwp_weight=None,
)
Loading