Source code for sclkme.tools._kme

from typing import Any, Dict, Literal, Optional, Union

import numpy as np
import pandas as pd
from anndata import AnnData
from numpy.random import RandomState
from scanpy.tools._utils import _choose_representation
from scipy.sparse import issparse
from sklearn.metrics import pairwise_distances
from sklearn.utils import check_array, check_random_state

from ._utils import random_feats

_KernelCapable = Literal[
    "additive_chi2",
    "chi2",
    "linear",
    "poly",
    "polynomial",
    "rbf",
    "laplacian",
    "sigmoid",
    "cosine",
]

SHIFT_INVARIANT_KERNEL = ["rbf"]


[docs]def kernel_mean_embedding( adata: AnnData, partition_key: str, X_anchor: np.ndarray, n_pcs: Optional[str] = None, use_rep: Optional[str] = None, random_state: Union[None, int, RandomState] = 0, method: Literal["exact", "approx"] = "exact", kernel: _KernelCapable = "rbf", kernel_kwds: Dict[str, Any] = {}, n_jobs: Optional[int] = None, key_added: Optional[str] = None, copy=False, ) -> Optional[AnnData]: """\ Landmark-based Kernel Mean Embedding *Kernel mean embedding* (KME) is a method used in machine learning and statistics to map probability distribution into a Reproducing Kernel Hilbert Space (RKHS). After embedding distributions into a RKHS, we can work with distributions in a more tractable way. Here, we applied the `KME` to sample distributions. By evaluating the transformed distributions in the RKHS at the landmark cells, we align them at the same landmark space, and generate a sample-by-landmark kernel score matrix as sample embeddings. .. note:: To know more about the background of kernel mean embedding, please refer to this book :cite:`kme2017`. Parameters ---------- adata Annotated data of type `anndata.AnnData` partition_key Column key in the field of `adata.obs` indicating the sample ids X_anchor An numpy ndarray as embeddings of landmark cells. **Note**: `X_anchor` should have the same dimension with the representation to be used in the `adata`. n_pcs Use this many PCs. If `n_pcs==0` use `.X` if `use_rep is None` use_rep Use the indicated representation. `'X'` or any key for `.obsm` is valid. If `None`, the representation is chosen automatically: For `.n_vars` < 50, `.X` is used, otherwise 'X_pca' is used. If 'X_pca' is not present, it's computed with default parameters. random_state A random seed which supports an `int`, `None` and `numpy.random.RandomState` method method to use for kernel calculation, supporting two options `exact` an `approx`. `exact` means directly calculate the kernel matrix. `approx` means that the kernel is approximated by random fourier features :cite:`rahimi2007random`. kernel kernels to use. We support many kernels like `rbf`, `linear`, and `laplacian`. For more kernels, see the global var: `sclkme.tl._kme._KernelCapable`. kernel_kwds A dic holding extra kwargs for the function of `sklearn.metrics.pairwise_kernels` in the calculation of kernels between cells. n_jobs number of cpus to use in the calculation of kernels. `n_jobs = 0 or None` means only use one CPU. `n_jobs > 0 and n_jobs < MAX_CPUS_AVAILABLE` will use that number of cpus. `n_jobs = -1` means using all CPUs. For more details, check the document of `sklearn.metrics.pairwise_kernels`. key_added If not specified, the results including params are stored in `.uns['kme']` as a dict, and sample embeddings are stored in `.obs['X_kme']`. If specified, the results are stored in `.uns[f{key_added}_kme]`, and the sample embeddings are stored in `.obsm['X_kme']`. copy Return a copy instead of writing the results into `adata`. Returns ------- Depending on `copy`, updates or returns `adata` with the following fields. **X_kme** : `adata.obsm` field sample embeddings of data **kme** or **{key_added}_kme**: `adata.uns` field results from kernel mean embeddings, including both parameters and sample embeddings Example ------- >>> import sclkme >>> import scanpy as sc Load annotated dataset: >>> adata = sc.datasets.pbmc3k_processed() Run kernel mean embedding to generate sample embeddings .. note:: Here, we just use the `louvain` field in `adata.obs` as **sample ids** for illustration. In your application, you need to change it to your real sample ids. >>> sclkme.tl.sketch(adata, n_sketch=128, use_rep="X_pca") >>> X_anchor = adata[adata.obs['sketch']].obsm['X_pca'].copy() >>> sclkme.tl.kernel_mean_embedding(adata, partition_key='louvain', use_rep="X_pca", X_anchor=X_anchor) """ # first we run the sketching algorithm to find the anchors adata = adata.copy() if copy else adata if key_added is None: key_added = "kme" else: key_added = key_added + "_kme" adata.uns[key_added] = {} kme_dict = adata.uns[key_added] kme_dict["partition_key"] = partition_key kme_dict["params"] = {"kernel": kernel, "method": method} kme_dict["params"]["random_state"] = random_state if kernel_kwds: kme_dict["params"]["kernel_kwds"] = kernel_kwds X = _choose_representation(adata, use_rep=use_rep, n_pcs=n_pcs, silent=True) if issparse(X): X = X.toarray() if use_rep is not None: kme_dict["params"]["use_rep"] = use_rep if n_pcs is not None: kme_dict["params"]["n_pcs"] = n_pcs # check the input anchor matrix X_anchor = check_array( X_anchor, dtype=np.float32, ensure_2d=True, accept_sparse=False ) if X_anchor.shape[1] != X.shape[1]: raise ValueError( f"the shape: {X_anchor.shape} of input anchor matrix \ cannot match the data representation shape in `adata`." ) if method == "exact": from sklearn.metrics import pairwise_kernels # compute the kernels between X and X_anchor if kernel == "rbf": X_kme = rbf_kernel( X, X_anchor, gamma=kernel_kwds.get("gamma", 1), n_jobs=n_jobs ) elif kernel == "laplacian": X_kme = laplacian_kernel( X=X, Y=X_anchor, gamma=kernel_kwds.get("gamma", 1), n_jobs=n_jobs ) else: X_kme = pairwise_kernels( X=X, Y=X_anchor, metric=kernel, n_jobs=n_jobs, **kernel_kwds ) # make the kernel matrix sparse if necessary mask = X_kme > 1e-14 X_kme[~mask] = 0 elif method == "approx": if kernel not in SHIFT_INVARIANT_KERNEL: raise ValueError( f"Only shift invariant kernels: [{','.join(SHIFT_INVARIANT_KERNEL)}] " "support `method=approx` for kernel mean embedding." ) kme_dict["params"]["kernel_kwds"]["gamma"] = kernel_kwds.get("gamma", 1) kme_dict["params"]["kernel_kwds"]["D"] = kernel_kwds.get("D", 2000) random_state = check_random_state(random_state) phi_X = random_feats( X, gamma=kme_dict["params"]["kernel_kwds"]["gamma"], D=kme_dict["params"]["kernel_kwds"]["D"], random_state=random_state, ) phi_X_anchor = random_feats( X_anchor, gamma=kme_dict["params"]["kernel_kwds"]["gamma"], D=kme_dict["params"]["kernel_kwds"]["D"], random_state=random_state, ) # outer multiplication X_kme = phi_X @ phi_X_anchor.T adata.obsm["X_kme"] = X_kme # save the results # save the resulting embeddings based on partition_key df_partition = pd.DataFrame( adata.obsm["X_kme"].copy(), columns=[f"X_kme_{i}" for i in range(adata.obsm["X_kme"].shape[1])], ) df_partition[partition_key] = adata.obs[partition_key].tolist() df_partition = df_partition.groupby(partition_key, sort=False).aggregate(np.mean) kme_dict[f"{partition_key}_kme"] = df_partition return adata if copy else None
def rbf_kernel(X, Y=None, gamma=None, normalized=True, n_jobs=None): """Compute the rbf (gaussian) kernel between X and Y K(X, Y) = exp(-gamma ||x - y||^2 / Den) for each pair of rows in X and y in Y. Note: Den := np.max(Dsq, axis=0) / 4 """ Dsq = pairwise_distances(X, Y, metric="euclidean", squared=True, n_jobs=n_jobs) Den = np.max(Dsq, axis=0) / 4 if normalized else 1 if gamma is None: gamma = 1.0 / X.shape[1] K = np.exp(-gamma * Dsq / Den) return K def laplacian_kernel(X, Y, gamma=None, normalized=True, n_jobs=None): """Compute the laplacian kernel between X and Y. K(X, Y) = exp(-gamma ||x - y||_1 / Den) for each pair of rows x in X and y in Y. """ D_manhattan = pairwise_distances(X, Y, metric="manhattan", n_jobs=n_jobs) Den = np.max(D_manhattan, axis=0) / 4 if normalized else 1 if gamma is None: gamma = 1.0 / X.shape[1] K = np.exp(-gamma * D_manhattan / Den) return K