From 93b70165fa40d1bf9034c17774c879335adae552 Mon Sep 17 00:00:00 2001 From: Nicola Demo Date: Mon, 1 Jun 2026 12:51:18 +0200 Subject: [PATCH 1/2] adding graph timeseries --- .../condition/graph_time_series_condition.py | 171 ++++++++++ .../test_graph_time_series_condition.py | 319 ++++++++++++++++++ ...test_autoregressive_single_model_solver.py | 53 +-- 3 files changed, 524 insertions(+), 19 deletions(-) create mode 100644 pina/_src/condition/graph_time_series_condition.py create mode 100644 tests/test_condition/test_graph_time_series_condition.py diff --git a/pina/_src/condition/graph_time_series_condition.py b/pina/_src/condition/graph_time_series_condition.py new file mode 100644 index 000000000..d7dcbb3ca --- /dev/null +++ b/pina/_src/condition/graph_time_series_condition.py @@ -0,0 +1,171 @@ +"""Module for the TimeSeriesCondition class.""" + +import torch +from pina._src.core.utils import check_consistency, check_positive_integer +from pina._src.data.manager.data_manager import _DataManager +from pina._src.condition.time_series_condition import TimeSeriesCondition +from pina._src.core.label_tensor import LabelTensor +from pina._src.condition.base_condition import BaseCondition +from torch_geometric.data import Data +from pina._src.core.graph import Graph + + +class GraphTimeSeriesCondition(TimeSeriesCondition): + """ + The :class:`TimeSeriesCondition` class represents an autoregressive time + series condition defined by temporal ``input`` data. The input is expected + to have shape ``[trajectories, time_steps, *features]``, where the second + dimension corresponds to the temporal evolution of each trajectory. + + During training, the condition automatically extracts overlapping temporal + windows from the trajectories. The parameter ``unroll_length`` defines the + number of consecutive time steps contained in each temporal window, while + ``n_windows`` controls how many temporal windows are created from the + available trajectories. + + Internally, the unrolled data is stored as a tensor of shape + ``[trajectories, n_windows, unroll_length, *features]``. + + Supported data types include :class:`~pina.label_tensor.LabelTensor` and + :class:`torch.Tensor`. + + :Example: + + >>> from pina import Condition, LabelTensor + >>> import torch + + >>> data = LabelTensor(torch.rand(5, 10, 2), labels=["u", "v"]) + >>> condition = Condition(input=data, unroll_length=5, n_windows=3) + """ + + # Available fields and input data types + __fields__ = ["input", "unroll_length", "n_windows", "randomize"] + _avail_input_cls = (Data, Graph) + + def __new__(cls, input, n_windows, unroll_length, key='x', randomize=False): + # Check consistency + check_consistency(input, cls._avail_input_cls) + check_consistency(randomize, bool) + check_consistency(key, str) + check_positive_integer(n_windows, strict=True) + check_positive_integer(unroll_length, strict=True) + + return BaseCondition.__new__(cls) + + def store_data(self, **kwargs): + """ + Store the unrolled time-series input data. + + The method extracts the time-series input data and creates the temporal + windows based on the specified ``unroll_length`` and ``n_windows``. + + :param dict kwargs: The keyword arguments containing the data to be + stored. + :return: A dictionary-like structure containing the stored data. + :rtype: _DataManager + """ + # Extract unrolling parameters from kwargs + unroll_length = kwargs.get("unroll_length") + n_windows = kwargs.get("n_windows") + randomize = kwargs.get("randomize", False) + key = kwargs.get("key", "x") + graph = kwargs.get("input") + + # Create unrolled windows from the input data + if isinstance(graph, Data): + if not hasattr(graph, key): + raise ValueError( + f"The provided graph does not have the specified key '{key}'." + ) + unrolled_data = self._unroll( + data=graph.__getattribute__(key), + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + graph.__setattr__(key, unrolled_data) + + elif isinstance(graph, Graph): + for graph_ in graph: + if not hasattr(graph_, key): + raise ValueError( + f"One of the provided graphs does not have the specified key '{key}'." + ) + unrolled_data = self._unroll( + data=graph_.__getattribute__(key), + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + graph_.__setattr__(key, unrolled_data) + + return _DataManager(input=graph) + + def evaluate(self, batch, solver): + """ + Evaluate the residual of the condition on the given batch using the + solver. + + This method computes the per-step residuals through autoregressive + unrolling. A forward pass of the solver's model is performed at each + time step, and the per-step residuals (predicted - target) are + returned as a stacked tensor. + + The returned tensor preserves all per-step residual values without + reduction or loss aggregation. + + :param dict batch: The batch containing the data required by the + condition evaluation. + :param SolverInterface solver: The solver used to perform the forward + pass and compute the residual. The solver provides access to the + model and its parameters, which may be necessary for evaluating the + condition residual. + :raises ValueError: If the input tensor in the batch has less than 4 + dimensions. + :return: The stacked per-step residual tensor of shape + [time_steps - 1, trajectories, windows, *features]. + :rtype: torch.Tensor | LabelTensor + """ + # Raise error if input tensor does not have at least 4 dimensions + if batch["input"].dim() < 4: + raise ValueError( + "The provided input tensor must have at least 4 dimensions:" + " [trajectories, windows, time_steps, *features]." + f" Got shape {batch['input'].shape}." + ) + + # Copy the kwargs to avoid modifying the original settings + kwargs = solver._kwargs.copy() + + # Extract the initial state and initialize the step-wise residuals list + current_state = batch["input"][:, :, 0] + residuals = [] + + # Iterate over the time steps + for step in range(1, batch["input"].shape[2]): + + # Pre-process, forward, and post-process the current state + processed_input = solver.preprocess_step(current_state, **kwargs) + output = solver.forward(processed_input) + predicted_state = solver.postprocess_step(output, **kwargs) + + # Retrieve the target and compute the step-wise residual + target_state = batch["input"][:, :, step] + step_residual = predicted_state - target_state + residuals.append(step_residual) + + # Update the current state for the next iteration + current_state = predicted_state + + # Stack the step-wise residuals + return torch.stack(residuals).as_subclass(torch.Tensor) + + @property + def input(self): + """ + The unrolled temporal input data. + + :return: The input data. + :rtype: torch.Tensor | LabelTensor + """ + return self.data.input diff --git a/tests/test_condition/test_graph_time_series_condition.py b/tests/test_condition/test_graph_time_series_condition.py new file mode 100644 index 000000000..d0976d1d7 --- /dev/null +++ b/tests/test_condition/test_graph_time_series_condition.py @@ -0,0 +1,319 @@ +import pytest +import torch +from pina.data.manager import _TensorDataManager, _BatchManager +from pina._src.core.utils import labelize_forward +from pina.condition import TimeSeriesCondition +from pina import LabelTensor, Condition +from pina._src.condition.graph_time_series_condition import GraphTimeSeriesCondition +from pina.graph import RadiusGraph + +# Number of samples and time steps for testing +n_samples = 5 +n_graphs = 10 +n_nodes = 20 +time_steps = 10 + + +# Helper function to check tensor types +def _assert_tensor_type(t, use_lt): + if use_lt: + assert isinstance(t, LabelTensor) + else: + assert isinstance(t, torch.Tensor) and not isinstance(t, LabelTensor) + + +# Helper function to compute expected unroll windows +def _expected_unroll(data, n_windows, unroll_length, randomize): + + # Compute valid starting indices + last_idx = data.shape[1] - unroll_length + start_indices = torch.arange(last_idx + 1) + + # Randomize indices if required + if randomize: + start_indices = start_indices[torch.randperm(len(start_indices))] + + # Limit the number of windows + if n_windows is not None and n_windows < len(start_indices): + start_indices = start_indices[:n_windows] + + # Build expected windows + windows = [data[:, s : s + unroll_length] for s in start_indices] + + return torch.stack(windows, dim=1) + +# Helper function to create graph data +def _create_graph_data(is_input, use_lt): + + # If LabelTensor is used, create graph data with LabelTensors + if use_lt: + x = LabelTensor(torch.rand(n_graphs, n_nodes, 2), ["u", "v"]) + pos = LabelTensor(torch.rand(n_graphs, n_nodes, 2), ["x", "y"]) + tensor = LabelTensor(torch.rand(n_graphs, n_nodes, 2), ["f", "g"]) + + # Standard torch.Tensor without labels + else: + x = torch.rand(n_graphs, n_nodes, 2) + pos = torch.rand(n_graphs, n_nodes, 2) + tensor = torch.rand(n_graphs, n_nodes, 2) + + # Create a list of Graphs + graph = [ + RadiusGraph( + pos=pos[i], + radius=0.1, + x=x[i] if is_input else None, + y=x[i] if not is_input else None, + ) + for i in range(len(x)) + ] + + return graph, tensor + + +# Define a dummy solver for testing +class DummySolver: + + def __init__(self, use_lt, input_vars): + if use_lt: + self.forward = labelize_forward( + forward=self.forward, + input_variables=input_vars, + output_variables=input_vars, + ) + + self._params = None + self._kwargs = {} + self.aggregation_strategy = torch.mean + + def forward(self, samples): + return samples + + def preprocess_step(self, current_state, **kwargs): + return current_state + + def postprocess_step(self, predicted_state, **kwargs): + return predicted_state + + def _get_weights(self, condition_name, step_losses): + return 1.0 + + +@pytest.mark.parametrize("use_lt", [True, False]) +@pytest.mark.parametrize("n_windows", [4, 6]) +@pytest.mark.parametrize("unroll_length", [3, 5]) +@pytest.mark.parametrize("randomize", [True, False]) +def test_constructor(use_lt, n_windows, unroll_length, randomize): + + # Define the condition + input_tensor, _ = _create_graph_data(is_input=True, use_lt=use_lt) + condition = GraphTimeSeriesCondition( + input=input_tensor, + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + + # Assert correct types + assert isinstance(condition, TimeSeriesCondition) + # _assert_tensor_type(condition.input, use_lt) + + # Assert numerical parity + if not randomize: + expected_tensor = _expected_unroll( + input_tensor, n_windows, unroll_length, randomize + ) + assert torch.allclose(condition.input, expected_tensor) + + # Assert labels if LabelTensor is used + if use_lt: + assert condition.input.labels == ["u", "v"] + + # Should fail if unroll_length is not a positive integer + with pytest.raises(AssertionError): + Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=0, + randomize=randomize, + ) + + # Should fail if n_windows is not a positive integer + with pytest.raises(AssertionError): + Condition( + input=input_tensor, + n_windows=0, + unroll_length=unroll_length, + randomize=randomize, + ) + + # Should fail if randomize is not a boolean value + with pytest.raises(ValueError): + Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=unroll_length, + randomize="not_a_boolean", + ) + + # Should fail if the input tensor has less than 3 dimensions + with pytest.raises(ValueError): + Condition( + input=torch.rand(n_samples, 2), + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + + # Should fail if unroll_length is not greater than 1 + with pytest.raises(ValueError): + Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=1, + randomize=randomize, + ) + + # Should fail if unroll_length is greater than the number of time steps + with pytest.raises(ValueError): + Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=time_steps + 1, + randomize=randomize, + ) + + # Should fail if n_windows is greater than the number of valid windows + with pytest.raises(ValueError): + Condition( + input=input_tensor, + n_windows=10, + unroll_length=unroll_length, + randomize=randomize, + ) + + +@pytest.mark.parametrize("use_lt", [True, False]) +@pytest.mark.parametrize("n_windows", [4, 6]) +@pytest.mark.parametrize("unroll_length", [3, 5]) +@pytest.mark.parametrize("randomize", [True, False]) +def test_get_item(use_lt, n_windows, unroll_length, randomize): + + # Define the condition + input_tensor = _create_tensor_data(use_lt) + condition = Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + + # Extract item using __getitem__ + index = 0 + item = condition[index] + + # Assert correct types + assert isinstance(item, _TensorDataManager) + _assert_tensor_type(item.input, use_lt) + + # Assert correct shapes + expected_shape = torch.Size([n_windows, unroll_length, 2]) + assert item.input.shape == expected_shape + + # Assert numerical parity + if not randomize: + expected_tensor = _expected_unroll( + input_tensor, n_windows, unroll_length, randomize + ) + assert torch.allclose(item.input, expected_tensor[index]) + + +@pytest.mark.parametrize("use_lt", [True, False]) +@pytest.mark.parametrize("n_windows", [4, 6]) +@pytest.mark.parametrize("unroll_length", [3, 5]) +@pytest.mark.parametrize("randomize", [True, False]) +def test_create_batch(use_lt, n_windows, unroll_length, randomize): + + # Define the condition + input_tensor = _create_tensor_data(use_lt) + condition = Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + + # Create batches using automatic batching or condition's collate_fn + idx = [0, 2] + data_to_collate = [condition.data[i] for i in idx] + batch_auto = condition.automatic_batching_collate_fn(data_to_collate) + batch_collate = condition.collate_fn(idx, condition) + + # Check that the automatic batch has been properly created + assert isinstance(batch_auto, _BatchManager) + assert hasattr(batch_auto, "input") + + # Check that the collate_fn batch has been properly created + assert isinstance(batch_collate, dict) + assert hasattr(batch_collate, "input") + + # Assert that the automatic batch input is correct + expected_shape = torch.Size([len(idx), n_windows, unroll_length, 2]) + assert batch_auto.input.shape == expected_shape + + # Assert that the collate_fn batch input is correct + expected_shape = torch.Size([len(idx), n_windows, unroll_length, 2]) + assert batch_collate.input.shape == expected_shape + + # Create input values + if not randomize: + expected_tensor = _expected_unroll( + input_tensor, n_windows, unroll_length, randomize + ) + assert torch.allclose(batch_collate.input, expected_tensor[idx]) + assert torch.allclose(batch_auto.input, expected_tensor[idx]) + + +@pytest.mark.parametrize("use_lt", [True, False]) +@pytest.mark.parametrize("n_windows", [4, 6]) +@pytest.mark.parametrize("unroll_length", [3, 5]) +@pytest.mark.parametrize("randomize", [True, False]) +def test_evaluate(use_lt, n_windows, unroll_length, randomize): + + # Define the input tensor + input_tensor = _create_tensor_data(use_lt) + input_vars = input_tensor.labels if use_lt else None + + # Define the condition and the solver + condition = Condition( + input=input_tensor, + n_windows=n_windows, + unroll_length=unroll_length, + randomize=randomize, + ) + solver = DummySolver(use_lt, input_vars) + loss_fn = torch.nn.MSELoss(reduction="none") + + # Extract the batch + batch = {"input": condition.input} + + # Evaluate the condition and compute the expected residuals + residuals = condition.evaluate(batch, solver) + + # Compute expected autoregressive step residuals + step_residuals = [] + current_state = batch["input"][:, :, 0] + + for step in range(1, batch["input"].shape[2]): + predicted_state = current_state + target_state = batch["input"][:, :, step] + + step_residual = predicted_state - target_state + step_residuals.append(step_residual) + + current_state = predicted_state + + expected = torch.stack(step_residuals).as_subclass(torch.Tensor) + + # Assert that the evaluated residuals are correct + assert torch.allclose(residuals, expected) diff --git a/tests/test_solver/test_autoregressive_single_model_solver.py b/tests/test_solver/test_autoregressive_single_model_solver.py index 226e68f87..f3b6a9401 100644 --- a/tests/test_solver/test_autoregressive_single_model_solver.py +++ b/tests/test_solver/test_autoregressive_single_model_solver.py @@ -10,13 +10,14 @@ # Settings for test purposes n_traj = 5 t_steps = 10 +n_dofs = 40 n_feats = 2 n_windows = 3 unroll_length = 5 # Helper function to create tensor data -def create_data(n_traj, t_steps, n_feats, use_lt): +def create_scalar_data(use_lt): # Define the data tensor data = torch.rand(n_traj, t_steps, n_feats) @@ -28,6 +29,14 @@ def create_data(n_traj, t_steps, n_feats, use_lt): else: return data +def create_vector_data(use_lt): + data = torch.rand(n_traj, t_steps, n_dofs, n_feats) + if use_lt: + labels = [f"feat_{i}" for i in range(n_feats)] + return LabelTensor(data, labels=labels) + else: + return data + # Define a dummy problem for testing class DummyProblem(BaseProblem): @@ -51,10 +60,12 @@ def __init__(self, data): @pytest.mark.parametrize("use_lt", [True, False]) @pytest.mark.parametrize("bool_value", [True, False]) @pytest.mark.parametrize("eps", [0.0, 1.0]) -def test_constructor(use_lt, bool_value, eps): +@pytest.mark.parametrize("create_data", [create_scalar_data, create_vector_data]) +@pytest.mark.parametrize("aggregation_strategy", [torch.mean, torch.sum]) +def test_constructor(use_lt, bool_value, eps, create_data, aggregation_strategy): - # Define the problem and model - data = create_data(n_traj, t_steps, n_feats, use_lt) + # Define the problem + data = create_data(use_lt) problem = DummyProblem(data) model = FeedForward(n_feats, n_feats, 10, 2) @@ -93,10 +104,12 @@ def test_constructor(use_lt, bool_value, eps): @pytest.mark.parametrize("use_lt", [True, False]) @pytest.mark.parametrize("batch_size", [None, 1, 2, 5]) -def test_solver_train(use_lt, batch_size): +@pytest.mark.parametrize("compile", [True, False]) +@pytest.mark.parametrize("create_data", [create_scalar_data, create_vector_data]) +def test_solver_train(use_lt, batch_size, compile, create_data): - # Define the problem and model - data = create_data(n_traj, t_steps, n_feats, use_lt) + # Define the problem + data = create_data(use_lt) problem = DummyProblem(data) model = FeedForward(n_feats, n_feats, 10, 2) @@ -122,10 +135,12 @@ def test_solver_train(use_lt, batch_size): @pytest.mark.parametrize("use_lt", [True, False]) @pytest.mark.parametrize("batch_size", [None, 1, 2, 5]) -def test_solver_validation(use_lt, batch_size): +@pytest.mark.parametrize("compile", [True, False]) +@pytest.mark.parametrize("create_data", [create_scalar_data, create_vector_data]) +def test_solver_validation(use_lt, batch_size, compile, create_data): - # Define the problem and model - data = create_data(n_traj, t_steps, n_feats, use_lt) + # Define the problem + data = create_data(use_lt) problem = DummyProblem(data) model = FeedForward(n_feats, n_feats, 10, 2) @@ -151,10 +166,12 @@ def test_solver_validation(use_lt, batch_size): @pytest.mark.parametrize("use_lt", [True, False]) @pytest.mark.parametrize("batch_size", [None, 1, 2, 5]) -def test_solver_test(use_lt, batch_size): +@pytest.mark.parametrize("compile", [True, False]) +@pytest.mark.parametrize("create_data", [create_scalar_data, create_vector_data]) +def test_solver_test(use_lt, batch_size, compile, create_data): - # Define the problem and model - data = create_data(n_traj, t_steps, n_feats, use_lt) + # Define the problem + data = create_data(use_lt) problem = DummyProblem(data) model = FeedForward(n_feats, n_feats, 10, 2) @@ -179,13 +196,11 @@ def test_solver_test(use_lt, batch_size): @pytest.mark.parametrize("use_lt", [True, False]) -def test_train_load_restore(clean_tmp_dir, use_lt): - - # Initialize the directory to store the checkpoints - dir = clean_tmp_dir +@pytest.mark.parametrize("create_data", [create_scalar_data, create_vector_data]) +def test_train_load_restore(use_lt, create_data): - # Define the problem and model - data = create_data(n_traj, t_steps, n_feats, use_lt) + # Define the problem + data = create_data(use_lt) problem = DummyProblem(data) model = FeedForward(n_feats, n_feats, 10, 2) From 6941932bd046a30bffe2980123b77484a091272c Mon Sep 17 00:00:00 2001 From: Nicola Demo Date: Thu, 11 Jun 2026 17:50:04 +0200 Subject: [PATCH 2/2] rst files for graph-ts-cond --- docs/source/_rst/_code.rst | 1 + .../_rst/condition/graph_time_series_condition.rst | 9 +++++++++ pina/condition/__init__.py | 2 ++ 3 files changed, 12 insertions(+) create mode 100644 docs/source/_rst/condition/graph_time_series_condition.rst diff --git a/docs/source/_rst/_code.rst b/docs/source/_rst/_code.rst index 0c289183e..6c8fb0f77 100644 --- a/docs/source/_rst/_code.rst +++ b/docs/source/_rst/_code.rst @@ -60,6 +60,7 @@ Conditions Condition Data Condition Domain Equation Condition + Graph Time Series Condition Input Equation Condition Input Target Condition diff --git a/docs/source/_rst/condition/graph_time_series_condition.rst b/docs/source/_rst/condition/graph_time_series_condition.rst new file mode 100644 index 000000000..3297d1b9b --- /dev/null +++ b/docs/source/_rst/condition/graph_time_series_condition.rst @@ -0,0 +1,9 @@ +Graph Time Series Condition +=========================== +.. currentmodule:: pina.condition.graph_time_series_condition + +.. automodule:: pina._src.condition.graph_time_series_condition + +.. autoclass:: pina._src.condition.graph_time_series_condition.GraphTimeSeriesCondition + :members: + :show-inheritance: \ No newline at end of file diff --git a/pina/condition/__init__.py b/pina/condition/__init__.py index f6df39bfa..0cfe18de0 100644 --- a/pina/condition/__init__.py +++ b/pina/condition/__init__.py @@ -15,6 +15,7 @@ "InputEquationCondition", "DataCondition", "TimeSeriesCondition", + "GraphTimeSeriesCondition", ] from pina._src.condition.condition_interface import ConditionInterface @@ -27,3 +28,4 @@ from pina._src.condition.input_equation_condition import InputEquationCondition from pina._src.condition.data_condition import DataCondition from pina._src.condition.time_series_condition import TimeSeriesCondition +from pina._src.condition.graph_time_series_condition import GraphTimeSeriesCondition