Skip to content

PLS-DA

Partial Least Squares Discriminant Analysis with double cross-validation.

Functions

motco.stats.pls.plsda_doubleCV(X, y, cv1_splits=7, cv2_splits=8, n_repeats=30, max_components=50, random_state=1203, n_jobs=1, progress=True)

Estimate a double cross validation on a partial least squares regression - discriminant analysis.

Parameters:

Name Type Description Default
X DataFrame

The predictor variables.

required
y Union[DataFrame, Series]

The outcome varibale.

required
cv1_splits int

Number of folds in the CV1 loop. Default: 7.

7
cv2_splits int

Number of folds in the CV2 loop. Default: 8.

8
n_repeats int

Number of repeats to the cv2 procedure. Default: 30.

30
max_components int

Maximum number of LV to test. Default: 50.

50
random_state int

For reproducibility. Default: 1203.

1203
n_jobs int

Number of parallel workers for the inner CV loop. Use -1 for all available CPUs. Default: 1 (serial).

1

Returns:

Name Type Description
models_table dict[str,
           Union[PLSRegression,
                 pd.DataFrame]

Dictionary with the table of the best models, including repetition, number of latent variables, and AUROC. Also includes the model for prediction.

Source code in src/motco/stats/pls.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def plsda_doubleCV(
    X: pd.DataFrame,
    y: Union[pd.DataFrame, pd.Series],
    cv1_splits: int = 7,
    cv2_splits: int = 8,
    n_repeats: int = 30,
    max_components: int = 50,
    random_state: int = 1203,
    n_jobs: int = 1,
    progress: bool = True,
) -> dict[str, Union[PLSRegression, pd.DataFrame]]:
    """
    Estimate a double cross validation on a partial least squares
    regression - discriminant analysis.

    Parameters
    ----------
    X: pd.DataFrame
        The predictor variables.
    y: Union[pd.DataFrame, pd.Series]
        The outcome varibale.
    cv1_splits: int
        Number of folds in the CV1 loop. Default: 7.
    cv2_splits: int
        Number of folds in the CV2 loop. Default: 8.
    n_repeats: int
        Number of repeats to the cv2 procedure. Default: 30.
    max_components: int
        Maximum number of LV to test. Default: 50.
    random_state: int
        For reproducibility. Default: 1203.
    n_jobs: int
        Number of parallel workers for the inner CV loop. Use -1 for all
        available CPUs. Default: 1 (serial).

    Returns
    -------
    models_table: dict[str,
                       Union[PLSRegression,
                             pd.DataFrame]
        Dictionary with the table of the best models, including repetition,
        number of latent variables, and AUROC. Also includes the model for
        prediction.
    """
    _X_arr = np.asarray(X, dtype=float)
    _y_arr = np.asarray(y)
    _n_x = _X_arr.shape[0]
    _n_y = _y_arr.shape[0]
    if _n_x != _n_y:
        raise ValueError(
            f"X has {_n_x} rows but y has {_n_y} rows — number of rows must match."
        )
    _n_classes = len(np.unique(_y_arr))
    if _n_classes < 2:
        raise ValueError(
            f"y has {_n_classes} unique class(es); at least 2 are required."
        )
    if max_components > _X_arr.shape[1]:
        raise ValueError(
            f"max_components={max_components} exceeds the number of features in X "
            f"({_X_arr.shape[1]}); reduce max_components."
        )
    if not np.all(np.isfinite(_X_arr)):
        raise ValueError("X contains NaN or Inf values.")
    encoder = OneHotEncoder(sparse_output=False)
    yd = pd.DataFrame(encoder.fit_transform(np.array(y).reshape(-1, 1)))
    cv2 = RepeatedStratifiedKFold(
        n_splits=cv2_splits, n_repeats=n_repeats, random_state=random_state
    )
    cv1 = StratifiedKFold(n_splits=cv1_splits)
    cv2_table = pd.DataFrame(np.zeros((cv2_splits, 2)))
    cv1_table = pd.DataFrame(np.zeros((cv1_splits, 2)))
    for_table = {
        "rep": list(range(1, n_repeats + 1)),
        "LV": list(range(1, n_repeats + 1)),
        "AUROC": [0.1] * n_repeats,
    }
    model_table = pd.DataFrame(for_table)
    row_cv2 = 0
    row_model_table = 0
    cv2_models = []
    best_models = []
    cv2_iter = tqdm(cv2.split(X, y), total=cv2_splits * n_repeats, desc="PLS-DA CV2", unit="fold", disable=not progress)
    for rest, test in cv2_iter:
        # Outer CV2 loop split into test and rest
        X_rest = X.iloc[rest, :]
        X_test = X.iloc[test, :]
        y_rest = y.iloc[rest]
        yd_rest = yd.iloc[rest, :]
        yd_test = yd.iloc[test, :]
        row_cv1 = 0
        for train, validation in cv1.split(X_rest, y_rest):
            # Inner CV validates optimal number of LVs
            X_train = X_rest.iloc[train, :]
            yd_train = yd_rest.iloc[train, :]
            X_val = X_rest.iloc[validation, :]
            yd_val = yd_rest.iloc[validation, :]
            ns = list(range(1, max_components))
            args = zip(ns, repeat(X_train), repeat(yd_train), repeat(X_val), repeat(yd_val))
            auroc: list[float]
            if n_jobs == 1:
                auroc = [_plsda_auroc(*a) for a in args]  # type: ignore[misc]
            else:
                n_workers = (
                    (multiprocessing.cpu_count() or 1) if n_jobs == -1 else max(1, n_jobs)
                )
                with multiprocessing.Pool(processes=n_workers) as pool:
                    auroc = pool.starmap(_plsda_auroc, args)  # type: ignore[arg-type]
            nlv = auroc.index(max(auroc)) + 1
            cv1_table.iloc[row_cv1, 0] = nlv
            cv1_table.iloc[row_cv1, 1] = max(auroc)
            row_cv1 += 1
        # Obtain optimal n of components
        best_cv1_idx = int(cv1_table[1].idxmax())
        n_components = int(cv1_table.iloc[best_cv1_idx, 0])  # type: ignore[arg-type]
        model_score = _plsda_auroc(
            n_components, X_rest, yd_rest, X_test, yd_test, return_full=True
        )
        cv2_table.iloc[row_cv2, 0] = n_components
        cv2_table.iloc[row_cv2, 1] = model_score["score"]  # type: ignore[index]
        cv2_models.append(model_score["model"])  # type: ignore[index]
        row_cv2 += 1
        if row_cv2 == cv2_splits:
            best_cv2_idx = int(cv2_table[1].idxmax())
            best_cv2_lv = int(cv2_table.iloc[best_cv2_idx, 0])  # type: ignore[arg-type]
            auroc_val = cv2_table.iloc[best_cv2_idx, 1]
            model_table.iloc[row_model_table, 1] = best_cv2_lv
            model_table.iloc[row_model_table, 2] = auroc_val
            best_models.append(cv2_models[best_cv2_idx])
            row_model_table += 1
            cv2_table = pd.DataFrame(np.zeros((cv2_splits, 2)))
            cv2_models = []
            row_cv2 = 0
    models_table = {"models": best_models, "table": model_table}
    return models_table

motco.stats.pls.calculate_vips(model, components=None)

Estimates Variable Importance in Projection (VIP) in Partial Least Squares (PLS)

Parameters:

Name Type Description Default
model

model generated from the PLSRegression function

required
components Union[None, list[int]]

if not None, a list of integers indicating the components to compute the VIPs from. If None, all components are taken into account. Default None.

None

Returns:

Name Type Description
vips array

variable importance in projection for each variable

Source code in src/motco/stats/pls.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def calculate_vips(
    model,
    components: Union[None, list[int]] = None,
) -> np.ndarray:
    """
    Estimates Variable Importance in Projection (VIP)
    in Partial Least Squares (PLS)

    Parameters
    ----------
    model: PLSRegression
        model generated from the PLSRegression function
    components: Union[None, list[int]]
        if not None, a list of integers indicating the components to compute
        the VIPs from. If None, all components are taken into account.
        Default None.

    Returns
    -------
    vips: np.array
        variable importance in projection for each variable
    """
    if components is not None:
        t = model.x_scores_[:, components]
        w = model.x_weights_[:, components]
        q = model.y_loadings_[:, components]
        p, h = w.shape
    else:
        w = model.x_weights_
        t = model.x_scores_
        q = model.y_loadings_
        p, h = w.shape

    vips = np.zeros((p,))
    s = np.diag(np.matmul(np.matmul(np.matmul(t.T, t), q.T), q)).reshape(h, -1)
    total_s = np.sum(s)
    for i in range(p):
        weight = np.array([(w[i, j] / np.linalg.norm(w[:, j])) ** 2 for j in range(h)])
        vips[i] = np.sqrt(p * np.matmul(s.T, weight)[0] / total_s)
    return vips