Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,5 @@ venv.bak/
dmypy.json

# Pyre type checker
.pyre/
.pyre/
.DS_Store
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ A complete usage guide is available in as comments in each function. The followi
idSys = PSID.PSID(y, z, nx, n1, i)
# Or, if modeling effect of input u is also of interest
idSys = PSID.IPSID(y, z, u, nx, n1, i)
# Or, to fit B while constraining the y-output feedthrough Dy to zero
idSys = PSID.IPSID(y, z, u, nx, n1, i, fit_Dy=False)
```
Passing `fit_Dy=False` constrains only `Dy` to zero. It does not constrain `Dz`, which may still be learned in IPSID configurations that model z with input feedthrough.
Inputs:
- y and z are time x dimension matrices with neural (e.g. LFP signal powers or spike counts) and behavioral data (e.g. joint angles, hand position, etc), respectively.
- IPSID also takes u as an input, which is a time x dimension matrix, containing the measured input data.
Expand Down
67 changes: 50 additions & 17 deletions source/PSID/IPSID.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def recomputeObsAndStates(A, C, i, YHat, YHatMinus):
return Xk, Xk_Plus1


def computeBD(A, C, Yii, Xk_Plus1, Xk, i, nu, Uf):
def computeBD(A, C, Yii, Xk_Plus1, Xk, i, nu, Uf, fit_Dy=True):
"""
Computes matrices corresponding to the effect of external input
Returns:
Expand Down Expand Up @@ -146,10 +146,21 @@ def computeBD(A, C, Yii, Xk_Plus1, Xk, i, nu, Uf):
LHS = LHS + np.kron(Uf[(ii * nu) : (ii * nu + nu), :].T, NN @ RMul)
NNAll.append(NN)

DBVec = np.linalg.lstsq(LHS, PP.flatten(order="F"), rcond=None)[0]
DB = np.reshape(DBVec, [nx + ny, nu], order="F")
D = DB[:ny, :]
B = DB[ny : (ny + nx), :]
rhs = PP.flatten(order="F")
if fit_Dy:
DBVec = np.linalg.lstsq(LHS, rhs, rcond=None)[0]
DB = np.reshape(DBVec, [nx + ny, nu], order="F")
D = DB[:ny, :]
B = DB[ny : (ny + nx), :]
else:
keep_cols = []
block_size = nx + ny
for input_idx in range(nu):
block_start = input_idx * block_size
keep_cols.extend(range(block_start + ny, block_start + block_size))
BVec = np.linalg.lstsq(LHS[:, keep_cols], rhs, rcond=None)[0]
B = np.reshape(BVec, [nx, nu], order="F")
D = np.zeros((ny, nu))
return B, D


Expand Down Expand Up @@ -280,6 +291,7 @@ def IPSID(
WS=dict(),
return_WS=False,
fit_Cz_via_KF=True,
fit_Dy=True,
time_first=True,
remove_mean_Y=True,
remove_mean_Z=True,
Expand Down Expand Up @@ -367,33 +379,37 @@ def IPSID(
- (9) fit_Cz_via_KF (default: True): if True (preferred option),
refits Cz more accurately using a KF after all other
parameters are learned
- (10) time_first (default: True): if True, will expect the time dimension
- (10) fit_Dy (default: True): if False, constrains the y-output feedthrough
parameter Dy to zero while still fitting B. This does not
constrain Dz, which may still be learned in IPSID paths that
model z with input feedthrough.
- (11) time_first (default: True): if True, will expect the time dimension
of the data to be the first dimension (e.g. Z is T x nz). If False,
will expect time to be the second dimension in all data
(e.g. Z is nz x T).
- (11) remove_mean_Y: if True will remove the mean of Y.
- (12) remove_mean_Y: if True will remove the mean of Y.
Must be True if data is not zero mean. Defaults to True.
- (12) remove_mean_Z: if True will remove the mean of Z.
- (13) remove_mean_Z: if True will remove the mean of Z.
Must be True if data is not zero mean. Defaults to True.
- (13) remove_mean_U: if True will remove the mean of U.
- (14) remove_mean_U: if True will remove the mean of U.
Must be True if data is not zero mean. Defaults to True.
- (14) zscore_Y: if True will z-score Y. It is ok to set this to False,
- (15) zscore_Y: if True will z-score Y. It is ok to set this to False,
but setting to True may help with stopping some dimensions of
data from dominating others. Defaults to False.
- (15) zscore_Z: if True will z-score Z. It is ok to set this to False,
- (16) zscore_Z: if True will z-score Z. It is ok to set this to False,
but setting to True may help with stopping some dimensions of
data from dominating others. Defaults to False.
- (16) zscore_U: if True will z-score U. It is ok to set this to False,
- (17) zscore_U: if True will z-score U. It is ok to set this to False,
but setting to True may help with stopping some dimensions of
data from dominating others. Defaults to False.
- (17) missing_marker (default: None): if not None, will discard samples of Z that
- (18) missing_marker (default: None): if not None, will discard samples of Z that
equal to missing_marker when fitting Cz. Only effective if fit_Cz_via_KF is
True.
- (18) remove_nonYrelated_fromX1 (default: False): If remove_nonYrelated_fromX1=True, the direct effect
- (19) remove_nonYrelated_fromX1 (default: False): If remove_nonYrelated_fromX1=True, the direct effect
of input u(k) on z(k) would be excluded from x1(k) in additional step 1 (preprocessing stage).
If False, additional step 1 won't happen and x3 (and its corresponding model parameters
[A33, B3, Cz3 and noise statistics related to x3]) won't be learned even if n3>0 provided.
- (19) n_pre (default: np.inf): preprocessing dimension used in additional step 1.
- (20) n_pre (default: np.inf): preprocessing dimension used in additional step 1.
Additional step 1 only happens if remove_nonYrelated_fromX1=True.
Large values of n_pre (assuming there is enough data to fit models with
such large state dimensions) would ensure all dynamics of Y are preserved in
Expand All @@ -402,7 +418,7 @@ def IPSID(
(all available SVD dimensions).
If n_pre=0, Additional steps 1 and 2 won't happen and x3 won't be learned
(remove_nonYrelated_fromX1 will be set to False, n3 will be 0).
- (20) n3: number of latent states x3(k) in the optional additional step 2.
- (21) n3: number of latent states x3(k) in the optional additional step 2.

Outputs:
- (1) idSys: an LSSM object with the system parameters for
Expand All @@ -429,9 +445,12 @@ def IPSID(
a special case of IPSID. To do so, simply set Z=None and n1=0.
(6) NDM (or SID, i.e., Standard Subspace Identification without input U, unsupervised by Z) can be performed as
a special case of IPSID. To do so, simply set Z=None, U=None and n1=0.
(7) Setting fit_Dy=False constrains only Dy to zero. It does not constrain Dz, which may
still be learned depending on Z, U, and additional-step settings.

Usage example:
idSys = IPSID(Y, Z, U, nx=nx, n1=n1, i=i); # With external input
idSys = IPSID(Y, Z=None, U=U, nx=nx, n1=0, i=i, fit_Dy=False); # With external input and Dy constrained to zero
idSys = IPSID(Y, Z, U, nx=nx, n1=n1, remove_nonYrelated_inX1=True, n_pre=n_pre, i=i); # With external input and preprocessing x1(k)
idSys = IPSID(Y, Z, U, nx=nx, n1=n1, remove_nonYrelated_inX1=True, n_pre=n_pre, n3=n3, i=i); # With external input, preprocessing x1(k) and optional states x3(k)
idSysPSID = IPSID(Y, Z, nx=nx, n1=n1, i=i); # No external input: PSID
Expand Down Expand Up @@ -547,6 +566,20 @@ def IPSID(
): # Due to provided settings, preprocessing step is disabled and X3 won't be learned.
remove_nonYrelated_fromX1, n_pre, n3 = False, 0, 0

if not fit_Dy:
if nu == 0:
warnings.warn(
"fit_Dy=False has no effect because no input U was provided or inferred."
)
if nu > 0 and nz > 0 and not remove_nonYrelated_fromX1:
warnings.warn(
"fit_Dy=False only constrains Dy; Dz may still be learned when Z and U are both provided."
)
if nu > 0 and n3 > 0:
warnings.warn(
"fit_Dy=False does not constrain Dz introduced by additional step 2 (n3 > 0)."
)

if n1 > 0 and nz > 0:
if n1 > iZ * nz:
raise (
Expand Down Expand Up @@ -794,7 +827,7 @@ def IPSID(

# Recompute Oy and Oy_Minus using A and Cy and recompute Xk and Xk_Plus1 using the new Oy
Xk, Xk_Plus1 = recomputeObsAndStates(A, Cy, iY, YHat, YHatMinus)
B, Dy = computeBD(A, Cy, Yii, Xk_Plus1, Xk, iY, nu, Uf)
B, Dy = computeBD(A, Cy, Yii, Xk_Plus1, Xk, iY, nu, Uf, fit_Dy=fit_Dy)
s.changeParams({"B": B, "D": Dy})

s.Cz = Cz
Expand Down
200 changes: 200 additions & 0 deletions source/PSID/tests/test_IPSID.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Copyright (c) 2020 University of Southern California
See full notice in LICENSE.md

Tests IPSID-specific functionality.
"""

import os
import sys
import unittest
import warnings

import numpy as np
from scipy import linalg

sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))


def make_input_driven_data(num_steps=300, include_z=False, seed=0):
from PSID.LSSM import LSSM

rng = np.random.default_rng(seed)
u = rng.standard_normal((num_steps, 1))
params = {
"A": np.array([[0.85]]),
"B": np.array([[0.45]]),
"C": np.array([[1.0]]),
"D": np.array([[0.3]]),
"Q": np.array([[0.02]]),
"R": np.array([[0.03]]),
"S": np.zeros((1, 1)),
}
model = LSSM(params=params)
y, x = model.generateRealization(num_steps, u=u)
z = None
if include_z:
z = 1.2 * x + 0.25 * u + 0.02 * rng.standard_normal((num_steps, 1))
return y, z, u


def collect_warning_messages(func, *args, **kwargs):
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = func(*args, **kwargs)
return result, [str(w.message) for w in caught]


def solve_expected_bd(A, C, Yii, Xk_Plus1, Xk, i, nu, Uf, fit_Dy):
from PSID.IPSID import computeObsFromAC

Oy, Oy_Minus = computeObsFromAC(A, C, i)
PP = np.concatenate((Xk_Plus1 - A @ Xk, Yii - C @ Xk))

L1 = A @ np.linalg.pinv(Oy)
L2 = C @ np.linalg.pinv(Oy)

nx = A.shape[0]
ny = C.shape[0]
ZM = np.concatenate((np.zeros((nx, ny)), np.linalg.pinv(Oy_Minus)), axis=1)

lhs = np.zeros((PP.size, (nx + ny) * nu))
r_mul = linalg.block_diag(np.eye(ny), Oy_Minus)
for ii in range(i):
nn = np.zeros((nx + ny, i * ny))
nn[:nx, : ((i - ii) * ny)] = ZM[:, (ii * ny) :] - L1[:, (ii * ny) :]
nn[nx : (nx + ny), : ((i - ii) * ny)] = -L2[:, (ii * ny) :]
if ii == 0:
nn[nx : (nx + ny), :ny] = nn[nx : (nx + ny), :ny] + np.eye(ny)
lhs = lhs + np.kron(Uf[(ii * nu) : (ii * nu + nu), :].T, nn @ r_mul)

rhs = PP.flatten(order="F")
if fit_Dy:
db_vec = np.linalg.lstsq(lhs, rhs, rcond=None)[0]
db = np.reshape(db_vec, [nx + ny, nu], order="F")
return db[ny:, :], db[:ny, :]

keep_cols = []
block_size = nx + ny
for input_idx in range(nu):
block_start = input_idx * block_size
keep_cols.extend(range(block_start + ny, block_start + block_size))
b_vec = np.linalg.lstsq(lhs[:, keep_cols], rhs, rcond=None)[0]
return np.reshape(b_vec, [nx, nu], order="F"), np.zeros((ny, nu))


class TestIPSID(unittest.TestCase):
def test_computeBD_can_constrain_Dy_to_zero(self):
from PSID.IPSID import computeBD

rng = np.random.default_rng(0)
A = np.array([[0.9, 0.1], [0.0, 0.8]])
C = np.array([[1.0, 0.2], [0.3, 0.7]])
i = 3
nu = 2
num_cols = 25
Xk = rng.standard_normal((A.shape[0], num_cols))
Xk_Plus1 = rng.standard_normal((A.shape[0], num_cols))
Yii = rng.standard_normal((C.shape[0], num_cols))
Uf = rng.standard_normal((i * nu, num_cols))

B, D = computeBD(A, C, Yii, Xk_Plus1, Xk, i, nu, Uf)
expected_B, expected_D = solve_expected_bd(
A, C, Yii, Xk_Plus1, Xk, i, nu, Uf, fit_Dy=True
)
np.testing.assert_allclose(B, expected_B)
np.testing.assert_allclose(D, expected_D)

B_zero, D_zero = computeBD(
A, C, Yii, Xk_Plus1, Xk, i, nu, Uf, fit_Dy=False
)
expected_B_zero, expected_D_zero = solve_expected_bd(
A, C, Yii, Xk_Plus1, Xk, i, nu, Uf, fit_Dy=False
)
np.testing.assert_allclose(B_zero, expected_B_zero)
np.testing.assert_allclose(D_zero, expected_D_zero)
np.testing.assert_allclose(D_zero, np.zeros_like(D_zero))

def test_ipsid_fit_Dy_false_sets_D_zero_for_input_model(self):
from PSID.IPSID import IPSID

y, _, u = make_input_driven_data(seed=1)
id_sys, messages = collect_warning_messages(
IPSID, y, Z=None, U=u, nx=1, n1=0, i=10, fit_Dy=False
)
self.assertEqual(id_sys.B.shape, (1, 1))
np.testing.assert_allclose(id_sys.D, np.zeros_like(id_sys.D))
self.assertFalse(any("fit_Dy=False has no effect" in msg for msg in messages))

def test_ipsid_fit_Dy_false_warns_when_no_input_is_provided(self):
from PSID.IPSID import IPSID

y, _, _ = make_input_driven_data(seed=2)
_, messages = collect_warning_messages(
IPSID, y, Z=None, U=None, nx=1, n1=0, i=10, fit_Dy=False
)
self.assertTrue(
any("fit_Dy=False has no effect" in msg for msg in messages)
)

def test_ipsid_fit_Dy_false_warns_that_Dz_may_still_be_learned(self):
from PSID.IPSID import IPSID

y, z, u = make_input_driven_data(include_z=True, seed=3)
_, messages = collect_warning_messages(
IPSID,
y,
Z=z,
U=u,
nx=1,
n1=1,
i=10,
fit_Dy=False,
remove_nonYrelated_fromX1=False,
)
self.assertTrue(any("Dz may still be learned" in msg for msg in messages))

def test_ipsid_fit_Dy_false_does_not_warn_about_Dz_when_preprocessing_forces_zero(self):
from PSID.IPSID import IPSID

y, z, u = make_input_driven_data(include_z=True, seed=4)
id_sys, messages = collect_warning_messages(
IPSID,
y,
Z=z,
U=u,
nx=1,
n1=1,
i=10,
fit_Dy=False,
remove_nonYrelated_fromX1=True,
n_pre=1,
n3=0,
)
self.assertFalse(any("Dz may still be learned" in msg for msg in messages))
self.assertFalse(any("additional step 2" in msg for msg in messages))
np.testing.assert_allclose(id_sys.Dz, np.zeros_like(id_sys.Dz))

def test_ipsid_fit_Dy_false_warns_when_additional_stage_can_introduce_Dz(self):
from PSID.IPSID import IPSID

y, z, u = make_input_driven_data(include_z=True, seed=5)
_, messages = collect_warning_messages(
IPSID,
y,
Z=z,
U=u,
nx=2,
n1=1,
i=10,
fit_Dy=False,
remove_nonYrelated_fromX1=True,
n_pre=1,
n3=1,
)
self.assertTrue(any("additional step 2" in msg for msg in messages))


if __name__ == "__main__":
unittest.main()