Source code for kalelinear.estimator._artl

# =============================================================================
# Author: Shuo Zhou, shuo.zhou@sheffield.ac.uk, The University of Sheffield
# =============================================================================
import numpy as np
from numpy.linalg import multi_dot
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils.validation import check_is_fitted

from kalelinear.estimator.base import BaseDomainAdaptationEstimator
from kalelinear.utils import lap_norm, mmd_coef, to_numpy

# =============================================================================
# Adaptation Regularisation Transfer Learning: ARTL
# Ref: Long, M., Wang, J., Ding, G., Pan, S.J. and Philip, S.Y., 2013.
# Adaptation regularization: A general framework for transfer learning.
# IEEE Transactions on Knowledge and Data Engineering, 26(5), pp.1076-1089.
# =============================================================================


def _init_artl(estimator, Xs, ys, X_target=None, y_target=None, **kwargs):
    """[summary]

    Parameters
    ----------
    Xs : array-like
        Source data, shape (n_source_samples, n_features)
    ys : array-like
        Source labels, shape (n_source_samples,)
    X_target : array-like
        Target data, shape (n_target_samples, n_features), the first
        n_labeled_target_samples samples are labeled and should be aligned
        with y_target if provided.
    y_target : array-like, optional
        Target label, shape (n_labeled_target_samples, ), by default None

    Returns
    -------
    X : array-like
        [description]
    y : array-like

    x_kernel_matrix : array-like

    M : array-like

    unit_matrix : array-like

    """

    Xs = to_numpy(Xs)
    ys = to_numpy(ys)
    X_target = to_numpy(X_target)
    y_target = to_numpy(y_target)
    if type(X_target) is np.ndarray:
        X = np.concatenate([Xs, X_target], axis=0)
        ns = Xs.shape[0]
        nt = X_target.shape[0]
        M = mmd_coef(ns, nt, ys, y_target, kind="joint")
    else:
        X = Xs.copy()
        M = np.zeros((X.shape[0], X.shape[0]))

    if y_target is not None:
        y = np.concatenate([ys, y_target])
    else:
        y = ys.copy()
    X, y, _, x_kernel_matrix, unit_matrix, _, _ = estimator._prepare_kernel_fit_data(X, y, **kwargs)

    return X, y, x_kernel_matrix, M, unit_matrix


def _prepare_artl_fit_data(
    estimator,
    X,
    y,
    covariates=None,
    target_covariate=None,
    unlabeled_value=None,
    **kwargs,
):
    split = estimator._split_source_target_by_covariate(
        X,
        y,
        covariates,
        target_covariate=target_covariate,
        unlabeled_value=unlabeled_value,
    )
    estimator.source_idx_ = split["source_idx"]
    estimator.target_idx_ = split["target_idx"]
    estimator.target_fit_idx_ = split["target_fit_idx"]
    estimator.target_covariate_ = split["target_covariate"]
    return _init_artl(
        estimator,
        split["Xs"],
        split["ys"],
        split["X_target"],
        split["y_target"],
        **kwargs,
    )


[docs] class ARSVM(BaseDomainAdaptationEstimator): def __init__( self, C=1.0, kernel="linear", lambda_=1.0, gamma_=0.0, k_neighbour=5, solver="osqp", manifold_metric="cosine", knn_mode="distance", **kwargs, ): """Adaptation Regularised Support Vector Machine Parameters ---------- C : float, optional param for importance of slack variable, by default 1.0 kernel : str, optional 'rbf' | 'linear' | 'poly' , by default 'linear' lambda_ : float, optional MMD regulisation param, by default 1.0 gamma_ : float, optional manifold regulisation param, by default 0.0 k_neighbour : int, optional number of nearest numbers for each sample in manifold regularisation, by default 5 solver : str, optional solver to solve quadprog, osqp or cvxopt, by default 'osqp' manifold_metric : str, optional The distance metric used to calculate the k-Neighbors for each sample point. The DistanceMetric class gives a list of available metrics. By default 'cosine'. knn_mode : str, optional {‘connectivity’, ‘distance’}, by default 'distance'. Type of returned matrix: ‘connectivity’ will return the connectivity matrix with ones and zeros, and ‘distance’ will return the distances between neighbors according to the given metric. kwargs : kernel param """ self.kwargs = kwargs self.kernel = kernel self.lambda_ = lambda_ self.C = C self.gamma_ = gamma_ self.solver = solver self.k_neighbour = k_neighbour # self.alpha = None self.knn_mode = knn_mode self.manifold_metric = manifold_metric self._lb = LabelBinarizer(pos_label=1, neg_label=-1) # self.scaler = StandardScaler()
[docs] def fit(self, X, y, covariates=None, target_covariate=None, unlabeled_value=None): """Fit the model according to the given training data. Parameters ---------- X : array-like Source and target data, shape (n_samples, n_features). y : array-like Source labels only, or one label per row in ``X``. If full-length labels include unlabeled target rows, mark them with ``unlabeled_value``. covariates : array-like, optional Binary domain labels aligned with ``X``. The value specified by ``target_covariate`` identifies target rows; all other rows are treated as source rows. target_covariate : scalar, optional Domain value identifying target samples. Defaults to the last sorted unique covariate value. unlabeled_value : scalar, optional Sentinel used for unlabeled target rows when ``y`` is full length. """ X, y, x_kernel_matrix, M, unit_matrix = _prepare_artl_fit_data( self, X, y, covariates=covariates, target_covariate=target_covariate, unlabeled_value=unlabeled_value, metric=self.kernel, filter_params=True, **self.kwargs, ) y_ = self._lb.fit_transform(y) if self.gamma_ != 0: lap_mat = lap_norm(X, n_neighbour=self.k_neighbour, mode=self.knn_mode) Q_ = unit_matrix + multi_dot([(self.lambda_ * M + self.gamma_ * lap_mat), x_kernel_matrix]) else: Q_ = unit_matrix + multi_dot([(self.lambda_ * M), x_kernel_matrix]) self.coef_, self.support_ = self._solve_semi_dual(x_kernel_matrix, y_, Q_, self.C, self.solver) # if self._lb.y_type_ == 'binary': # self.support_vectors_ = X[:nl, :][self.support_] # self.n_support_ = self.support_vectors_.shape[0] # else: # self.support_vectors_ = [] # self.n_support_ = [] # for i in range(y_.shape[1]): # self.support_vectors_.append(X[:nl, :][self.support_[i]][-1]) # self.n_support_.append(self.support_vectors_[-1].shape[0]) self.X = X self.y = y return self
[docs] def decision_function(self, X): """Evaluates the decision function for the samples in X. Parameters ---------- X : array-like shape (n_samples, n_features) Returns ------- array-like decision scores, , shape (n_samples,) for binary classification, (n_samples, n_classes) for multi-class cases """ check_is_fitted(self, "X") check_is_fitted(self, "y") # x_fit = self.X x_np = to_numpy(X) x_kernel_matrix = pairwise_kernels(x_np, self.X, metric=self.kernel, filter_params=True, **self.kwargs) scores = np.dot(x_kernel_matrix, self.coef_) return scores # +self.intercept_
[docs] def predict(self, X): """Perform classification on samples in X. Parameters ---------- X : array-like shape (n_samples, n_features) Returns ------- array-like predicted labels, , shape (n_samples, ) """ dec = to_numpy(self.decision_function(X)) return self._lb.inverse_transform(dec, threshold=0)
[docs] def fit_predict(self, X, y, covariates=None, target_covariate=None, unlabeled_value=None): """Fit the model according to the given training data and then perform classification on target samples. Parameters ---------- X : array-like Combined source and target data. y : array-like Source labels or full-length labels. covariates : array-like, optional Binary domain labels aligned with ``X``. """ self.fit( X, y, covariates=covariates, target_covariate=target_covariate, unlabeled_value=unlabeled_value, ) return self.predict(to_numpy(X)[self.target_idx_])
[docs] class ARRLS(BaseDomainAdaptationEstimator): def __init__( self, kernel="linear", lambda_=1.0, gamma_=0.0, sigma_=1.0, k_neighbour=5, manifold_metric="cosine", knn_mode="distance", **kwargs, ): """Adaptation Regularised Least Square Parameters ---------- kernel : str, optional 'rbf' | 'linear' | 'poly', by default 'linear' lambda_ : float, optional MMD regularisation param, by default 1.0 gamma_ : float, optional manifold regularisation param, by default 0.0 sigma_ : float, optional l2 regularisation param, by default 1.0 k_neighbour : int, optional number of nearest numbers for each sample in manifold regularisation, by default 5 manifold_metric : str, optional The distance metric used to calculate the k-Neighbors for each sample point. The DistanceMetric class gives a list of available metrics. By default 'cosine'. knn_mode : str, optional {'connectivity', 'distance'}, by default 'distance'. Type of returned matrix: 'connectivity' will return the connectivity matrix with ones and zeros, and 'distance' will return the distances between neighbors according to the given metric. kwargs: kernel param """ self.kwargs = kwargs self.kernel = kernel self.lambda_ = lambda_ self.gamma_ = gamma_ self.sigma_ = sigma_ self.k_neighbour = k_neighbour # self.coef_ = None self.knn_mode = knn_mode self.manifold_metric = manifold_metric self._lb = LabelBinarizer(pos_label=1, neg_label=-1)
[docs] def fit(self, X, y, covariates=None, target_covariate=None, unlabeled_value=None): """Fit the model according to the given training data. Parameters ---------- X : array-like Source and target data, shape (n_samples, n_features). y : array-like Source labels only, or one label per row in ``X``. If full-length labels include unlabeled target rows, mark them with ``unlabeled_value``. covariates : array-like, optional Binary domain labels aligned with ``X``. target_covariate : scalar, optional Domain value identifying target samples. Defaults to the last sorted covariate value. unlabeled_value : scalar, optional Sentinel used for unlabeled target rows when ``y`` is full length. """ X, y, x_kernel_matrix, M, unit_matrix = _prepare_artl_fit_data( self, X, y, covariates=covariates, target_covariate=target_covariate, unlabeled_value=unlabeled_value, metric=self.kernel, filter_params=True, **self.kwargs, ) n = x_kernel_matrix.shape[0] nl = y.shape[0] J = np.zeros((n, n)) J[:nl, :nl] = np.eye(nl) if self.gamma_ != 0: lap_mat = lap_norm(X, n_neighbour=self.k_neighbour, metric=self.manifold_metric, mode=self.knn_mode) Q_ = np.dot((J + self.lambda_ * M + self.gamma_ * lap_mat), x_kernel_matrix) + self.sigma_ * unit_matrix else: Q_ = np.dot((J + self.lambda_ * M), x_kernel_matrix) + self.sigma_ * unit_matrix y_ = self._lb.fit_transform(y) self.coef_ = self._solve_semi_ls(Q_, y_) self.X = X self.y = y return self
[docs] def predict(self, X): """Perform classification on samples in X. Parameters ---------- X : array-like shape (n_samples, n_features) Returns ------- array-like predicted labels, shape (n_samples) """ dec = to_numpy(self.decision_function(X)) return self._lb.inverse_transform(dec, threshold=0)
[docs] def decision_function(self, X): """Evaluates the decision function for the samples in X. Parameters ---------- X : array-like, shape (n_samples, n_features) Returns ------- array-like prediction scores, shape (n_samples) """ x_np = to_numpy(X) x_kernel_matrix = pairwise_kernels(x_np, self.X, metric=self.kernel, filter_params=True, **self.kwargs) scores = np.dot(x_kernel_matrix, self.coef_) return scores
[docs] def fit_predict(self, X, y, covariates=None, target_covariate=None, unlabeled_value=None): """Fit the model according to the given training data and then perform classification on target samples. Parameters ---------- X : array-like Combined source and target data. y : array-like Source labels or full-length labels. covariates : array-like, optional Binary domain labels aligned with ``X``. """ self.fit( X, y, covariates=covariates, target_covariate=target_covariate, unlabeled_value=unlabeled_value, ) return self.predict(to_numpy(X)[self.target_idx_])