Skip to content

Trajectory Analysis

Group trajectory differences: magnitude, orientation, and shape.

Design matrix

motco.stats.design.get_model_matrix(X, group_col, level_col, full=True)

Build a design (model) matrix for group × level factors.

Coding scheme
  • Intercept (column of ones).
  • Group main effects: one-hot with drop-first for groups (G-1 columns).
  • Level main effects: one-hot with drop-first for levels (L-1 columns).
  • If full=True, include all interaction terms between group and level dummies: (G-1) × (L-1) columns.

The category order is deterministic: sorted by string representation.

Parameters:

Name Type Description Default
X DataFrame

DataFrame containing group_col and level_col.

required
group_col str

Name of the group column.

required
level_col str

Name of the level/state column.

required
full bool

Whether to include interaction terms.

True

Returns:

Type Description
ndarray

Model matrix with intercept.

Source code in src/motco/stats/design.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_model_matrix(
    X: pd.DataFrame,
    group_col: str,
    level_col: str,
    full: bool = True,
) -> np.ndarray:
    """
    Build a design (model) matrix for group × level factors.

    Coding scheme
    -------------
    - Intercept (column of ones).
    - Group main effects: one-hot with drop-first for groups (G-1 columns).
    - Level main effects: one-hot with drop-first for levels (L-1 columns).
    - If `full=True`, include all interaction terms between group and level
      dummies: (G-1) × (L-1) columns.

    The category order is deterministic: sorted by string representation.

    Parameters
    ----------
    X: pd.DataFrame
        DataFrame containing `group_col` and `level_col`.
    group_col: str
        Name of the group column.
    level_col: str
        Name of the level/state column.
    full: bool
        Whether to include interaction terms.

    Returns
    -------
    np.ndarray
        Model matrix with intercept.
    """
    for col, param in [(group_col, "group_col"), (level_col, "level_col")]:
        if col not in X.columns:
            raise ValueError(
                f"{param}='{col}' not found in X. Available columns: {list(X.columns)}."
            )
        n_unique = X[col].nunique()
        if n_unique < 2:
            raise ValueError(
                f"{param}='{col}' has {n_unique} unique value(s); at least 2 are required."
            )
    # Determine deterministic category order
    g_levels = sorted(pd.unique(X[group_col].astype(str)).tolist())
    l_levels = sorted(pd.unique(X[level_col].astype(str)).tolist())
    g = pd.Categorical(X[group_col].astype(str), categories=g_levels, ordered=True)
    lc = pd.Categorical(X[level_col].astype(str), categories=l_levels, ordered=True)

    G = pd.get_dummies(g, drop_first=True, dtype=int)
    L = pd.get_dummies(lc, drop_first=True, dtype=int)

    parts = []
    # Intercept
    parts.append(pd.DataFrame({"Intercept": np.ones(len(X))}, index=X.index))
    # Main effects
    if G.shape[1] > 0:
        parts.append(G)
    if L.shape[1] > 0:
        parts.append(L)
    # Interactions
    if full and G.shape[1] > 0 and L.shape[1] > 0:
        inter_cols = {}
        for g_col in G.columns:
            for l_col in L.columns:
                inter_cols[f"{g_col}:{l_col}"] = np.asarray(G[g_col]) * np.asarray(L[l_col])
        parts.append(pd.DataFrame(inter_cols, index=X.index))

    model_mat = pd.concat(parts, axis=1).to_numpy()
    return model_mat

motco.stats.design.build_ls_means(group_levels, level_levels, full=True)

Generate LS-mean rows for every group × level cell consistent with get_model_matrix coding.

Parameters:

Name Type Description Default
group_levels Sequence[str]

Sorted group labels; first is baseline.

required
level_levels Sequence[str]

Sorted level labels; first is baseline.

required
full bool

Whether to include interaction terms.

True

Returns:

Type Description
ndarray

LS-mean design matrix with shape (G×L, 1 + (G-1) + (L-1) + I), where I = (G-1)×(L-1) if full=True else 0. Row order is by group major, then level minor.

Source code in src/motco/stats/design.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def build_ls_means(
    group_levels: Sequence[str],
    level_levels: Sequence[str],
    full: bool = True,
) -> np.ndarray:
    """
    Generate LS-mean rows for every group × level cell consistent with
    `get_model_matrix` coding.

    Parameters
    ----------
    group_levels: Sequence[str]
        Sorted group labels; first is baseline.
    level_levels: Sequence[str]
        Sorted level labels; first is baseline.
    full: bool
        Whether to include interaction terms.

    Returns
    -------
    np.ndarray
        LS-mean design matrix with shape (G×L, 1 + (G-1) + (L-1) + I), where
        I = (G-1)×(L-1) if `full=True` else 0. Row order is by group major,
        then level minor.
    """
    g_levels = list(group_levels)
    l_levels = list(level_levels)
    Gm1 = max(len(g_levels) - 1, 0)
    Lm1 = max(len(l_levels) - 1, 0)
    n_rows = max(len(g_levels), 1) * max(len(l_levels), 1)
    n_cols = 1 + Gm1 + Lm1 + (Gm1 * Lm1 if full else 0)
    M = np.zeros((n_rows, n_cols), dtype=float)

    def row_for(i_g: int, i_l: int) -> int:
        return i_g * len(l_levels) + i_l

    # Column indices
    col = 0
    INTERCEPT = col
    col += 1
    G_START = col
    col += Gm1
    L_START = col
    col += Lm1
    I_START = col if full else None

    for gi, g_val in enumerate(g_levels):
        for li, l_val in enumerate(l_levels):
            r = row_for(gi, li)
            # Intercept
            M[r, INTERCEPT] = 1.0
            # Group dummies (drop first)
            if gi > 0 and Gm1 > 0:
                M[r, G_START + (gi - 1)] = 1.0
            # Level dummies (drop first)
            if li > 0 and Lm1 > 0:
                M[r, L_START + (li - 1)] = 1.0
            # Interactions
            if full and gi > 0 and li > 0 and (Gm1 > 0 and Lm1 > 0):
                assert I_START is not None
                idx = (gi - 1) * Lm1 + (li - 1)
                M[r, I_START + idx] = 1.0
    return M

motco.stats.design.center_matrix(dat, group_col, level_col, feature_cols=None)

Center feature columns by per-group means.

Parameters:

Name Type Description Default
dat DataFrame

Original, non-centered dataframe.

required
group_col str

Column in dat indicating the group (between-subject factor).

required
level_col str

Column in dat indicating the level/state (within-group factor).

required
feature_cols Sequence[str] | None

Feature columns to center. If None, all numeric columns except group_col and level_col are used.

None

Returns:

Type Description
DataFrame

A copy of dat with selected feature columns centered within groups.

Source code in src/motco/stats/design.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def center_matrix(
    dat: pd.DataFrame,
    group_col: str,
    level_col: str,
    feature_cols: Sequence[str] | None = None,
) -> pd.DataFrame:
    """
    Center feature columns by per-group means.

    Parameters
    ----------
    dat: pd.DataFrame
        Original, non-centered dataframe.
    group_col: str
        Column in `dat` indicating the group (between-subject factor).
    level_col: str
        Column in `dat` indicating the level/state (within-group factor).
    feature_cols: Sequence[str] | None
        Feature columns to center. If None, all numeric columns except
        `group_col` and `level_col` are used.

    Returns
    -------
    pd.DataFrame
        A copy of `dat` with selected feature columns centered within groups.
    """
    if feature_cols is not None:
        missing = [c for c in feature_cols if c not in dat.columns]
        if missing:
            raise ValueError(
                f"feature_cols contains column(s) not found in dat: {missing}."
            )
    datc = dat.copy()
    if feature_cols is None:
        feature_cols = [
            c
            for c in datc.select_dtypes(include=[np.number]).columns.tolist()
            if c not in {group_col, level_col}
        ]
    if not feature_cols:
        return datc
    # Center within groups using group-wise means
    datc.loc[:, feature_cols] = (
        datc.loc[:, feature_cols]
        - datc.groupby(group_col)[feature_cols].transform("mean")
    )
    return datc

Estimation

motco.stats.trajectory.estimate_betas(X, Y)

Estimate the beta coefficients between an outcome matrix and a model matrix

Parameters:

Name Type Description Default
X Union[DataFrame, ndarray]

Model matrix with intercept.

required
Y Union[DataFrame, ndarray]

Outcome matrix.

required

Returns:

Name Type Description
betas Union[DataFrame, ndarray]

Beta coefficients

Source code in src/motco/stats/trajectory.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def estimate_betas(
    X: Union[pd.DataFrame, np.ndarray], Y: Union[pd.DataFrame, np.ndarray]
) -> Union[pd.DataFrame, np.ndarray]:
    """
    Estimate the beta coefficients between an outcome matrix
    and a model matrix

    Parameters
    ----------
    X: Union[pd.DataFrame, np.ndarray]
        Model matrix with intercept.
    Y: Union[pd.DataFrame, np.ndarray]
        Outcome matrix.

    Returns
    -------
    betas: Union[pd.DataFrame, np.ndarray]
        Beta coefficients
    """
    # Convert inputs to arrays for linear algebra while preserving Y's metadata
    X_arr = np.asarray(X, dtype=float)
    if isinstance(Y, pd.DataFrame):
        Y_arr = Y.to_numpy(dtype=float)
        y_cols = Y.columns
    else:
        Y_arr = np.asarray(Y, dtype=float)
        y_cols = None

    # Solve normal equations using factorization with robust fallbacks
    XtX = X_arr.T @ X_arr
    XtY = X_arr.T @ Y_arr
    try:
        # Cholesky is fastest and most stable for SPD XtX
        L = np.linalg.cholesky(XtX)
        tmp = np.linalg.solve(L, XtY)
        betas_arr = np.linalg.solve(L.T, tmp)
    except np.linalg.LinAlgError:
        logger.warning("Cholesky decomposition failed; falling back to direct solve. Check for near-singular XtX.")
        try:
            # Fall back to a direct solve of the normal equations
            betas_arr = np.linalg.solve(XtX, XtY)
        except np.linalg.LinAlgError:
            logger.warning("Direct solve failed; falling back to lstsq. Model matrix may be rank-deficient.")
            # Final fallback: least-squares without forming normal equations
            # This handles rank deficiency and ill-conditioning better.
            betas_arr, *_ = np.linalg.lstsq(X_arr, Y_arr, rcond=None)

    if y_cols is not None:
        # Return a DataFrame so downstream matmul with numpy yields a DataFrame
        # and index/column handling stays consistent with previous behavior.
        return pd.DataFrame(betas_arr, columns=y_cols)
    return betas_arr

motco.stats.trajectory.get_observed_vectors(X, Y, group_col, level_col, full=True)

Get LS-mean vectors for each group × level cell.

Parameters:

Name Type Description Default
X DataFrame

DataFrame containing factors group_col and level_col for each row corresponding to Y.

required
Y Union[DataFrame, ndarray]

Outcome matrix (n_samples × n_features).

required
group_col str

Group column name in X.

required
level_col str

Level/state column name in X.

required
full bool

Whether to include interactions in the model.

True

Returns:

Type Description
DataFrame

LS means arranged with a MultiIndex (group, level). Columns follow Y.

Source code in src/motco/stats/trajectory.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def get_observed_vectors(
    X: pd.DataFrame,
    Y: Union[pd.DataFrame, np.ndarray],
    group_col: str,
    level_col: str,
    full: bool = True,
) -> pd.DataFrame:
    """
    Get LS-mean vectors for each group × level cell.

    Parameters
    ----------
    X: pd.DataFrame
        DataFrame containing factors `group_col` and `level_col` for each row
        corresponding to `Y`.
    Y: Union[pd.DataFrame, np.ndarray]
        Outcome matrix (n_samples × n_features).
    group_col: str
        Group column name in `X`.
    level_col: str
        Level/state column name in `X`.
    full: bool
        Whether to include interactions in the model.

    Returns
    -------
    pd.DataFrame
        LS means arranged with a MultiIndex (group, level). Columns follow `Y`.
    """
    model_full = get_model_matrix(X[[group_col, level_col]], group_col, level_col, full)
    betas = estimate_betas(model_full, Y)

    g_levels = sorted(pd.unique(X[group_col].astype(str)).tolist())
    l_levels = sorted(pd.unique(X[level_col].astype(str)).tolist())
    ls_matrix = build_ls_means(g_levels, l_levels, full)
    means = np.matmul(ls_matrix, np.asarray(betas, dtype=float))

    # Build a clear index and columns
    idx = pd.MultiIndex.from_product([g_levels, l_levels], names=[group_col, level_col])
    if isinstance(Y, pd.DataFrame):
        cols = Y.columns
    else:
        cols = [f"f{i}" for i in range(means.shape[1])]  # type: ignore[assignment]
    return pd.DataFrame(means, index=idx, columns=cols)

motco.stats.trajectory.estimate_difference(Y, model_matrix, LS_means, contrast)

Estimate parameters angle, delta, and shape given an outcome matrix, model matrix, and contrast to compare. This is a comparison of more than two states.

Parameters:

Name Type Description Default
Y Union[DataFrame, ndarray]

Outcome matrix.

required
model_matrix Union[DataFrame, ndarray]

Model matrix with intercept.

required
LS_means Union[DataFrame, ndarray]

Least-squares means to estimate.

required
contrast list[list[int]]

Indices indicating the groups to compare based on LS means. Each list must contain the cohorts that belong to the same group.

required

Returns:

Name Type Description
deltas ndarray

Symmetric matrix (n_groups x n_groups) with differences in magnitude.

angles ndarray

Symmetric matrix (n_groups x n_groups) with differences in direction (degrees).

shapes ndarray

Symmetric matrix (n_groups x n_groups) with shape distances.

Notes

See [1]_ for more information on trajectory analysis.

References

.. [1] Adams, Dean C., and Michael L. Collyer. "A general framework for the analysis of phenotypic trajectories in evolutionary studies." Evolution: International Journal of Organic Evolution 63.5 (2009): 1143-1154. https://doi.org/10.1111/j.1558-5646.2009.00649.x

Source code in src/motco/stats/trajectory.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def estimate_difference(
    Y: Union[pd.DataFrame, np.ndarray],
    model_matrix: Union[pd.DataFrame, np.ndarray],
    LS_means: Union[pd.DataFrame, np.ndarray],
    contrast: list[list[int]],
) -> tuple:
    """
    Estimate parameters angle, delta, and shape given an outcome
    matrix, model matrix, and contrast to compare. This is a comparison
    of more than two states.

    Parameters
    ----------
    Y: Union[pd.DataFrame, np.ndarray]
        Outcome matrix.
    model_matrix: Union[pd.DataFrame, np.ndarray]
        Model matrix with intercept.
    LS_means: Union[pd.DataFrame, np.ndarray]
        Least-squares means to estimate.
    contrast: list[list[int]]
        Indices indicating the groups to compare based on LS means.
        Each list must contain the cohorts that belong to the same group.

    Returns
    -------
    deltas: np.ndarray
        Symmetric matrix (n_groups x n_groups) with differences in magnitude.
    angles: np.ndarray
        Symmetric matrix (n_groups x n_groups) with differences in direction (degrees).
    shapes: np.ndarray
        Symmetric matrix (n_groups x n_groups) with shape distances.

    Notes
    -----
    See [1]_ for more information on trajectory analysis.

    References
    ----------
    .. [1] Adams, Dean C., and Michael L. Collyer.
           "A general framework for the analysis of phenotypic trajectories in
           evolutionary studies."
           Evolution: International Journal of Organic Evolution 63.5 (2009):
           1143-1154.
           https://doi.org/10.1111/j.1558-5646.2009.00649.x
    """
    # --- Input validation ---
    _Y = np.asarray(Y, dtype=float)
    _X = np.asarray(model_matrix, dtype=float)
    _LS = np.asarray(LS_means, dtype=float)
    if _Y.shape[0] != _X.shape[0]:
        raise ValueError(
            f"Y has {_Y.shape[0]} rows but model_matrix has {_X.shape[0]} rows — "
            "number of rows must match."
        )
    if _LS.shape[1] != _X.shape[1]:
        raise ValueError(
            f"LS_means has {_LS.shape[1]} columns but model_matrix has {_X.shape[1]} columns — "
            "number of columns must match."
        )
    _n_ls = _LS.shape[0]
    for _gi, _group in enumerate(contrast):
        for _idx in _group:
            if not (0 <= _idx < _n_ls):
                raise ValueError(
                    f"contrast[{_gi}] contains index {_idx}, but LS_means only has {_n_ls} rows "
                    f"(valid indices: 0–{_n_ls - 1})."
                )
    if not np.all(np.isfinite(_Y)):
        raise ValueError("Y contains NaN or Inf values.")
    if not np.all(np.isfinite(_X)):
        raise ValueError("model_matrix contains NaN or Inf values.")
    # --- End validation ---
    n_groups = len(contrast)
    betas = estimate_betas(model_matrix, Y)
    # Compute LS-mean vectors; keep as DataFrame to minimize behavioral drift
    obs_vect = pd.DataFrame(
        np.matmul(np.asarray(LS_means, dtype=float), np.asarray(betas, dtype=float))
    )
    ys = []
    des = []
    angles = np.zeros((n_groups, n_groups))
    deltas = np.zeros((n_groups, n_groups))
    for i in range(n_groups):
        y = _estimate_orientation(obs_vect, contrast[i])
        d = _estimate_size(obs_vect, contrast[i])
        des.append(d)
        ys.append(y)
    shapes = _estimate_shape(obs_vect, contrast)
    for i in range(n_groups):
        comp = i + 1
        while comp < n_groups:
            delta = np.abs(des[i] - des[comp])
            # When using SVD, no need to divide by size
            dot = np.clip(np.inner(ys[i], ys[comp]), -1.0, 1.0)
            angle = np.arccos(dot) * 180 / np.pi
            deltas[i, comp] = delta
            deltas[comp, i] = delta
            angles[i, comp] = angle
            angles[comp, i] = angle
            comp += 1
    return deltas, angles, shapes

motco.stats.trajectory.pair_difference(dat, group_col, level_col, groups=None, levels=None, feature_cols=None)

Estimate difference in direction (angle, degrees) and magnitude (delta) between two groups across two levels.

The change vector for a group is defined as level1 - level2 over the selected feature columns.

Parameters:

Name Type Description Default
dat DataFrame

DataFrame containing features plus group_col and level_col.

required
group_col str

Column with groups (between-subject factor).

required
level_col str

Column with levels/states (within-subject factor).

required
groups tuple[str, str] | None

Pair of group labels to compare. If None, infer and require exactly two.

None
levels tuple[str, str] | None

Pair of level labels to use for the change vector. If None, infer and require exactly two.

None
feature_cols Sequence[str] | None

Feature columns to use. If None, all numeric columns except group_col and level_col are used.

None

Returns:

Type Description
tuple[float, float]

(angle_degrees, delta_magnitude_difference)

Notes

See [1]_ for more information on two-state comparisons.

References

.. [1] Collyer, Michael L., and Dean C. Adams. "Analysis of two‐state multivariate phenotypic change in ecological studies." Ecology 88.3 (2007): 683-692. https://doi.org/10.1890/06-0727

Source code in src/motco/stats/trajectory.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def pair_difference(
    dat: pd.DataFrame,
    group_col: str,
    level_col: str,
    groups: tuple[str, str] | None = None,
    levels: tuple[str, str] | None = None,
    feature_cols: Sequence[str] | None = None,
) -> tuple[float, float]:
    """
    Estimate difference in direction (angle, degrees) and magnitude (delta)
    between two groups across two levels.

    The change vector for a group is defined as `level1 - level2` over the
    selected feature columns.

    Parameters
    ----------
    dat: pd.DataFrame
        DataFrame containing features plus `group_col` and `level_col`.
    group_col: str
        Column with groups (between-subject factor).
    level_col: str
        Column with levels/states (within-subject factor).
    groups: tuple[str, str] | None
        Pair of group labels to compare. If None, infer and require exactly two.
    levels: tuple[str, str] | None
        Pair of level labels to use for the change vector. If None, infer and
        require exactly two.
    feature_cols: Sequence[str] | None
        Feature columns to use. If None, all numeric columns except `group_col`
        and `level_col` are used.

    Returns
    -------
    tuple[float, float]
        (angle_degrees, delta_magnitude_difference)

    Notes
    -----
    See [1]_ for more information on two-state comparisons.

    References
    ----------
    .. [1] Collyer, Michael L., and Dean C. Adams. "Analysis of two‐state
           multivariate phenotypic change in ecological studies." Ecology 88.3
           (2007): 683-692. https://doi.org/10.1890/06-0727
    """
    if feature_cols is None:
        feature_cols = [
            c
            for c in dat.select_dtypes(include=[np.number]).columns.tolist()
            if c not in {group_col, level_col}
        ]
    if not feature_cols:
        raise ValueError("No feature columns provided or detected.")

    g_vals = sorted(pd.unique(dat[group_col].astype(str)).tolist())
    l_vals = sorted(pd.unique(dat[level_col].astype(str)).tolist())
    if groups is None:
        if len(g_vals) != 2:
            raise ValueError(
                f"Expected exactly 2 groups, found {len(g_vals)}: {g_vals}"
            )
        groups = (g_vals[0], g_vals[1])
    if levels is None:
        if len(l_vals) != 2:
            raise ValueError(
                f"Expected exactly 2 levels, found {len(l_vals)}: {l_vals}"
            )
        levels = (l_vals[0], l_vals[1])

    # Compute per (group, level) means
    means = (
        dat.assign(
            __g=dat[group_col].astype(str), __l=dat[level_col].astype(str)
        )
        .groupby(["__g", "__l"])[feature_cols]
        .mean()
    )
    try:
        y_g1 = np.asarray(means.loc[(groups[0], levels[0])], dtype=float) - np.asarray(
            means.loc[(groups[0], levels[1])], dtype=float
        )
        y_g2 = np.asarray(means.loc[(groups[1], levels[0])], dtype=float) - np.asarray(
            means.loc[(groups[1], levels[1])], dtype=float
        )
    except KeyError as e:
        raise ValueError(
            "Missing combinations for the requested groups/levels in the data."
        ) from e

    d1 = float(np.linalg.norm(y_g1))
    d2 = float(np.linalg.norm(y_g2))
    delta = abs(d1 - d2)
    if d1 == 0 or d2 == 0:
        raise ValueError(
            "Zero-magnitude change vector for at least one group; angle is undefined."
        )
    angle = float(np.degrees(np.arccos(np.inner(y_g1 / d1, y_g2 / d2))))
    return angle, delta

Permutation test

motco.stats.permutation.RRPP(Y, model_full, model_reduced, LS_means, contrast, permutations=999, n_jobs=None, progress=True, seed=None)

Residual Randomization in a Permutation Procedure to evaluate linear models.

Parameters:

Name Type Description Default
Y Union[DataFrame, ndarray]

Outcome matrix.

required
model_full Union[DataFrame, ndarray]

Model matrix for full model, including intercept.

required
model_reduced Union[DataFrame, ndarray]

Model matrix for reduced model, including intercept.

required
LS_means Union[DataFrame, ndarray]

Least-squares means to estimate.

required
contrast list[list[int]]

Indices indicating the groups to compare based on LS means. Each list must contain the cohorts that belong to the same group.

required
permutations int

Number of permutations.

999
n_jobs Optional[int]

If provided and > 1, run permutations in parallel using multiple worker processes. Use -1 to use all available CPUs. When None or 1, runs single-threaded (backward-compatible default).

None
seed Optional[int]

Optional seed for reproducible residual randomization.

None

Returns:

Name Type Description
dist_delta list[float]

Distribution of deltas.

dist_angle list[float]

Distribution of angles.

Source code in src/motco/stats/permutation.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def RRPP(
    Y: Union[pd.DataFrame, np.ndarray],
    model_full: Union[pd.DataFrame, np.ndarray],
    model_reduced: Union[pd.DataFrame, np.ndarray],
    LS_means: Union[pd.DataFrame, np.ndarray],
    contrast: list[list[int]],
    permutations: int = 999,
    n_jobs: Optional[int] = None,
    progress: bool = True,
    seed: Optional[int] = None,
) -> tuple:
    """
    Residual Randomization in a Permutation Procedure to evaluate
    linear models.

    Parameters
    ----------
    Y: Union[pd.DataFrame, np.ndarray]
        Outcome matrix.
    model_full: Union[pd.DataFrame, np.ndarray]
        Model matrix for full model, including intercept.
    model_reduced: Union[pd.DataFrame, np.ndarray]
        Model matrix for reduced model, including intercept.
    LS_means: Union[pd.DataFrame, np.ndarray]
        Least-squares means to estimate.
    contrast: list[list[int]]
        Indices indicating the groups to compare based on LS means.
        Each list must contain the cohorts that belong to the same group.
    permutations: int
        Number of permutations.
    n_jobs: Optional[int]
        If provided and > 1, run permutations in parallel using multiple
        worker processes. Use -1 to use all available CPUs. When None or 1,
        runs single-threaded (backward-compatible default).
    seed: Optional[int]
        Optional seed for reproducible residual randomization.

    Returns
    -------
    dist_delta: list[float]
        Distribution of deltas.
    dist_angle: list[float]
        Distribution of angles.
    """
    # --- Input validation ---
    _Y = np.asarray(Y, dtype=float)
    _Xf = np.asarray(model_full, dtype=float)
    _Xr = np.asarray(model_reduced, dtype=float)
    _LS = np.asarray(LS_means, dtype=float)
    if _Y.shape[0] != _Xf.shape[0]:
        raise ValueError(
            f"Y has {_Y.shape[0]} rows but model_full has {_Xf.shape[0]} rows — "
            "number of rows must match."
        )
    if _Y.shape[0] != _Xr.shape[0]:
        raise ValueError(
            f"Y has {_Y.shape[0]} rows but model_reduced has {_Xr.shape[0]} rows — "
            "number of rows must match."
        )
    if _LS.shape[1] != _Xf.shape[1]:
        raise ValueError(
            f"LS_means has {_LS.shape[1]} columns but model_full has {_Xf.shape[1]} columns — "
            "number of columns must match."
        )
    _n_ls = _LS.shape[0]
    for _gi, _group in enumerate(contrast):
        for _idx in _group:
            if not (0 <= _idx < _n_ls):
                raise ValueError(
                    f"contrast[{_gi}] contains index {_idx}, but LS_means only has {_n_ls} rows "
                    f"(valid indices: 0–{_n_ls - 1})."
                )
    if not np.all(np.isfinite(_Y)):
        raise ValueError("Y contains NaN or Inf values.")
    if not np.all(np.isfinite(_Xf)):
        raise ValueError("model_full contains NaN or Inf values.")
    if not np.all(np.isfinite(_Xr)):
        raise ValueError("model_reduced contains NaN or Inf values.")
    # --- End validation ---
    # Set Y to be pandas df
    Y = pd.DataFrame(Y)
    # Set-up permutation procedure
    betas_red = estimate_betas(model_reduced, Y)
    # Predicted values from reduced model
    y_hat = np.matmul(model_reduced, betas_red)
    y_hat.index = Y.index
    # Resdiuals of reduced mode (these are the permuted units)
    y_res = Y - y_hat
    # Prepare NumPy views to avoid pandas inside the permutation loop
    y_hat_np = np.asarray(y_hat)
    y_res_np = np.asarray(y_res)
    # Prepare containers
    deltas: list[np.ndarray] = []
    angles: list[np.ndarray] = []
    shapes: list[np.ndarray] = []

    # Serial path
    if n_jobs in (None, 1):
        n = y_res_np.shape[0]
        rng = np.random.default_rng(seed)
        for _ in tqdm(range(permutations), desc="RRPP", unit="perm", disable=not progress):
            idx = rng.permutation(n)
            y_random = y_hat_np + y_res_np[idx, :]
            d, a, s = estimate_difference(y_random, model_full, LS_means, contrast)
            deltas.append(d)
            angles.append(a)
            shapes.append(s)
        return deltas, angles, shapes

    # Parallel path
    n_workers = (os.cpu_count() or 1) if n_jobs == -1 else max(1, n_jobs or 1)
    n_workers = min(n_workers, max(1, permutations))
    logger.info("Running %d permutations across %d workers", permutations, n_workers)
    base = permutations // n_workers
    rem = permutations % n_workers
    counts = [base + (1 if i < rem else 0) for i in range(n_workers)]

    ss = np.random.SeedSequence(seed)
    seeds = [int(s.generate_state(1)[0]) for s in ss.spawn(n_workers)]

    worker = _RRPPWorker(y_hat_np, y_res_np, model_full, LS_means, contrast)
    with multiprocessing.Pool(processes=n_workers) as pool:
        parts = pool.starmap(worker, zip(counts, seeds))

    for d_list, a_list, s_list in parts:
        deltas.extend(d_list)
        angles.extend(a_list)
        shapes.extend(s_list)

    return deltas, angles, shapes