From a961e828210c271b81c35db1ecfdf3213d5fd1a4 Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 11 Feb 2025 14:32:42 +0000 Subject: [PATCH] Add rechunk ERA5 benchmark Delete all intermediate data at the end of the test to save on unnecessary cloud storage costs Increase lithops mem to 3.5GB Measure reserved memory Always delete zarr intermediate data Extra buffer copies for S3 --- tests/benchmarks/test_array.py | 54 ++++++++++++++++++++++++++++++ tests/configs/lithops_aws_dev.yaml | 7 ++++ 2 files changed, 61 insertions(+) create mode 100644 tests/configs/lithops_aws_dev.yaml diff --git a/tests/benchmarks/test_array.py b/tests/benchmarks/test_array.py index a58679e..2b8c676 100644 --- a/tests/benchmarks/test_array.py +++ b/tests/benchmarks/test_array.py @@ -6,12 +6,22 @@ import xarray as xr import cubed +import cubed as xp import cubed.random from cubed.core.optimization import multiple_inputs_optimize_dag, simple_optimize_dag from cubed.diagnostics.rich import RichProgressBar from ..utils import run +def test_measure_reserved_mem(runtime): + spec = runtime + + if spec.executor.name in ("single-threaded", "threads"): + pytest.skip(f"Don't measure reserved memory on {spec.executor.name}") + + reserved_memory = cubed.measure_reserved_mem(executor=spec.executor, work_dir=spec.work_dir) + print("reserved memory", reserved_memory) + @pytest.mark.parametrize("optimizer", ["new-optimizer"]) @pytest.mark.parametrize("t_length", [50, 500, 5000]) @@ -90,3 +100,47 @@ def test_quadratic_means_xarray(tmp_path, runtime, benchmark_all, optimizer, t_l fs.rm(path, recursive=True) except FileNotFoundError: pass + + +@pytest.mark.skip(reason="Skipping due to large computation") +def test_rechunk_era5(tmp_path, runtime, benchmark_all): + spec = runtime + + if spec.executor.name in ("single-threaded", "threads"): + pytest.skip(f"Don't run large computation on {spec.executor.name}") + + # from https://github.com/pangeo-data/rechunker/pull/89 + shape = (350640, 721, 1440) + source_chunks = (31, 721, 1440) + target_chunks = (350640, 10, 10) + + # set the random seed to ensure deterministic results + random.seed(42) + + # create zarr test data (not timed) + a = cubed.random.random(shape, dtype=xp.float32, chunks=source_chunks, spec=spec) + path = f"{spec.work_dir}/a.zarr" + cubed.store([a], [path], compute_arrays_in_parallel=True, callbacks=[RichProgressBar()]) + + a = cubed.from_zarr(path, spec=spec) + result = a.rechunk(chunks=target_chunks, use_new_impl=True) + + result.visualize(filename=tmp_path / f"rechunk_era5") + + try: + # time only the computing of the result + run( + result, + executor=spec.executor, + benchmarks=benchmark_all, + callbacks=[RichProgressBar()], + _return_in_memory_array=False, # don't load result into memory! + ) + finally: + # delete zarr intermediate data (not timed) + try: + work_dir = spec.work_dir + fs, _, _ = fsspec.get_fs_token_paths(work_dir) + fs.rm(work_dir, recursive=True) + except FileNotFoundError: + pass diff --git a/tests/configs/lithops_aws_dev.yaml b/tests/configs/lithops_aws_dev.yaml new file mode 100644 index 0000000..a8866b4 --- /dev/null +++ b/tests/configs/lithops_aws_dev.yaml @@ -0,0 +1,7 @@ +spec: + work_dir: "s3://cubed-tom-temp" + allowed_mem: "3.5GB" + executor_name: "lithops" + executor_options: + runtime: "cubed-runtime-dev" + runtime_memory: 3500