Source code for eqc_models.ml.classifierqboost

# (C) Quantum Computing Inc., 2024.
# Import libs
import os
import sys
import time
import datetime
import json
import gc
import warnings
from functools import wraps
from multiprocessing import shared_memory, Pool, set_start_method, Manager
from multiprocessing.managers import SharedMemoryManager
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF

from eqc_models.ml.classifierbase import ClassifierBase
#from eqc_models.ml.cvqboost_hamiltonian import get_hamiltonian_pyx


def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        beg_time = time.time()
        val = func(*args, **kwargs)
        end_time = time.time()
        tot_time = end_time - beg_time

        print(
            "Runtime of %s: %0.2f seconds!"
            % (
                func.__name__,
                tot_time,
            )
        )

        return val

    return wrapper


class WeakClassifier:
    def __init__(
        self,
        X_train,
        y_train,
        weak_cls_type,
        max_depth=10,
        min_samples_split=100,
        num_jobs=1,
    ):
        assert X_train.shape[0] == len(y_train)

        self.X_train = X_train
        self.y_train = y_train

        if weak_cls_type == "dct":
            self.clf = DecisionTreeClassifier(
                max_depth=max_depth,
                min_samples_split=min_samples_split,
                random_state=0,
            )
        elif weak_cls_type == "nb":
            self.clf = GaussianNB()
        elif weak_cls_type == "lg":
            self.clf = LogisticRegression(random_state=0)
        elif weak_cls_type == "gp":
            self.clf = GaussianProcessClassifier(
                kernel=1.0 * RBF(1.0),
                random_state=0,
            )
        else:
            assert False, (
                "Unknown weak classifier type <%s>!" % weak_cls_type
            )

    def train(self):
        self.clf.fit(self.X_train, self.y_train)

    def predict(self, X):
        return self.clf.predict(X)


[docs] class QBoostClassifier(ClassifierBase): """An implementation of QBoost classifier that uses QCi's Dirac-3. Parameters ---------- relaxation_schedule: Relaxation schedule used by Dirac-3; default: 2. num_samples: Number of samples used by Dirac-3; default: 1. lambda_coef: A penalty multiplier; default: 0. weak_cls_schedule: Weak classifier schedule. Is either 1, 2, or 3; default: 2. weak_cls_type: Type of weak classifier - dct: Decison tree classifier - nb: Naive Baysian classifier - lg: Logistic regression - gp: Gaussian process classifier default: dct. weak_max_depth: Max depth of the tree. Applied only when weak_cls_type="dct". Default: 10. weak_min_samples_split: The minimum number of samples required to split an internal node. Applied only when weak_cls_type="dct". Default: 100. Examples ----------- >>> from sklearn import datasets >>> from sklearn.preprocessing import MinMaxScaler >>> from sklearn.model_selection import train_test_split >>> iris = datasets.load_iris() >>> X = iris.data >>> y = iris.target >>> scaler = MinMaxScaler() >>> X = scaler.fit_transform(X) >>> for i in range(len(y)): ... if y[i] == 0: ... y[i] = -1 ... elif y[i] == 2: ... y[i] = 1 >>> X_train, X_test, y_train, y_test = train_test_split( ... X, ... y, ... test_size=0.2, ... random_state=42, ... ) >>> from eqc_models.ml.classifierqboost import QBoostClassifier >>> obj = QBoostClassifier( ... relaxation_schedule=2, ... num_samples=1, ... lambda_coef=0.0, ... ) >>> from contextlib import redirect_stdout >>> import io >>> f = io.StringIO() >>> with redirect_stdout(f): ... obj.fit(X_train, y_train) ... y_train_prd = obj.predict(X_train) ... y_test_prd = obj.predict(X_test) """ def __init__( self, relaxation_schedule=2, num_samples=1, lambda_coef=0, weak_cls_schedule=2, weak_cls_type="lg", weak_max_depth=10, weak_min_samples_split=100, weak_cls_strategy="multi_processing", weak_cls_num_jobs=None, ): super(QBoostClassifier).__init__() assert weak_cls_schedule in [1, 2, 3] assert weak_cls_type in ["dct", "nb", "lg", "gp"] assert weak_cls_strategy in [ "multi_processing", "multi_processing_shm", "sequential", ] self.relaxation_schedule = relaxation_schedule self.num_samples = num_samples self.lambda_coef = lambda_coef self.weak_cls_schedule = weak_cls_schedule self.weak_cls_type = weak_cls_type self.weak_max_depth = weak_max_depth self.weak_min_samples_split = weak_min_samples_split self.weak_cls_strategy = weak_cls_strategy if weak_cls_num_jobs is None or weak_cls_num_jobs <= 0: self.weak_cls_num_jobs = os.cpu_count() else: self.weak_cls_num_jobs = int(weak_cls_num_jobs) self.h_list = [] self.ind_list = [] self.classes_ = None @timer def _build_weak_classifiers_sq(self, X, y): n_records = X.shape[0] n_dims = X.shape[1] assert len(y) == n_records self.h_list = [] self.ind_list = [] num_workers = self.weak_cls_num_jobs tasks = [] for l in range(n_dims): weak_classifier = WeakClassifier( X[:, [l]], y, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) weak_classifier.train() self.ind_list.append([l]) self.h_list.append(weak_classifier) if self.weak_cls_schedule >= 2: for i in range(n_dims): for j in range(i + 1, n_dims): weak_classifier = WeakClassifier( X[:, [i, j]], y, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) weak_classifier.train() self.ind_list.append([i, j]) self.h_list.append(weak_classifier) if self.weak_cls_schedule >= 3: for i in range(n_dims): for j in range(i + 1, n_dims): for k in range(j + 1, n_dims): weak_classifier = WeakClassifier( X[:, [i, j, k]], y, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) weak_classifier.train() self.ind_list.append([i, j, k]) self.h_list.append(weak_classifier) return def _train_weak_classifier_mp( self, indices, X_subset, y, n_records, n_dims, weak_cls_type, weak_max_depth, weak_min_samples_split, ): # Train the weak classifier weak_classifier = WeakClassifier( X_subset, y, weak_cls_type, weak_max_depth, weak_min_samples_split, ) weak_classifier.train() return indices, weak_classifier @timer def _build_weak_classifiers_mp(self, X, y): n_records = X.shape[0] n_dims = X.shape[1] assert len(y) == n_records self.h_list = [] self.ind_list = [] num_workers = self.weak_cls_num_jobs print(f"Using {num_workers} workers to build weak classifiers.") set_start_method("fork", force=True) tasks = [] for l in range(n_dims): tasks.append( ( [l], X[:, [l]], y, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) if self.weak_cls_schedule >= 2: for i in range(n_dims): for j in range(i + 1, n_dims): tasks.append( ( [i, j], X[:, [i, j]], y, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) if self.weak_cls_schedule >= 3: for i in range(n_dims): for j in range(i + 1, n_dims): for k in range(j + 1, n_dims): tasks.append( ( [i, j, k], X[:, [i, j, k]], y, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) # Parallel execution using Pool with Pool(processes=num_workers) as pool: results = pool.starmap(self._train_weak_classifier_mp, tasks) pool.join() pool.close() for indices, weak_classifier in results: self.ind_list.append(indices) self.h_list.append(weak_classifier) return def _train_weak_classifier_shm( self, indices, shm_X_name, shm_y_name, shared_list, n_records, n_dims, weak_cls_type, weak_max_depth, weak_min_samples_split, ): """Train a weak classifier using shared memory.""" shm_X_worker = shared_memory.SharedMemory(name=shm_X_name) shm_y_worker = shared_memory.SharedMemory(name=shm_y_name) X_shared = np.ndarray( (n_records, n_dims), dtype=np.float32, buffer=shm_X_worker.buf ) y_shared = np.ndarray( (n_records,), dtype=np.float32, buffer=shm_y_worker.buf ) X_subset = X_shared[:, indices] weak_classifier = WeakClassifier( X_subset, y_shared, weak_cls_type, weak_max_depth, weak_min_samples_split, ) weak_classifier.train() shared_list.append((indices, weak_classifier)) shm_X_worker.close() shm_y_worker.close() @timer def _build_weak_classifiers_shm(self, X, y): n_records = X.shape[0] n_dims = X.shape[1] assert len(y) == n_records self.h_list = [] self.ind_list = [] num_workers = self.weak_cls_num_jobs print(f"Using {num_workers} workers to build weak classifiers.") set_start_method("fork", force=True) X = np.ascontiguousarray(X, dtype=np.float32) y = np.ascontiguousarray(y, dtype=np.float32) with SharedMemoryManager() as shm_manager: shm_X = shm_manager.SharedMemory(size=X.nbytes) shm_y = shm_manager.SharedMemory(size=y.nbytes) X_shared = np.ndarray(X.shape, dtype=X.dtype, buffer=shm_X.buf) y_shared = np.ndarray(y.shape, dtype=y.dtype, buffer=shm_y.buf) np.copyto(X_shared, X) np.copyto(y_shared, y) with Manager() as manager: shared_list = manager.list() tasks = [] for l in range(n_dims): tasks.append( ( [l], shm_X.name, shm_y.name, shared_list, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) if self.weak_cls_schedule >= 2: for i in range(n_dims): for j in range(i + 1, n_dims): tasks.append( ( [i, j], shm_X.name, shm_y.name, shared_list, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) if self.weak_cls_schedule >= 3: for i in range(n_dims): for j in range(i + 1, n_dims): for k in range(j + 1, n_dims): tasks.append( ( [i, j, k], shm_X.name, shm_y.name, shared_list, n_records, n_dims, self.weak_cls_type, self.weak_max_depth, self.weak_min_samples_split, ) ) with Pool(processes=num_workers) as pool: results = pool.starmap( self._train_weak_classifier_shm, tasks ) pool.close() pool.join() for item in list(shared_list): self.ind_list.append(item[0]) self.h_list.append(item[1]) shm_X.close() shm_X.unlink() shm_y.close() shm_y.unlink() def _infer_one_weak_classifier(self, cls_ind, X_subset): return self.h_list[cls_ind].predict(X_subset) def _infer_weak_classifiers(self, X): n_classifiers = len(self.h_list) num_workers = self.weak_cls_num_jobs print(f"Using {num_workers} workers for inference.") set_start_method("fork", force=True) tasks = [] for i in range(n_classifiers): tasks.append((i, X[:, self.ind_list[i]])) with Pool(processes=num_workers) as pool: results = pool.starmap(self._infer_one_weak_classifier, tasks) return list(results)
[docs] def fit(self, X, y): """ Build a QBoost classifier from the training set (X, y). Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The training input samples. y : array-like of shape (n_samples,) The target values. Returns ------- Response of Dirac-3 in JSON format. """ assert X.shape[0] == y.shape[0], "Inconsistent sizes!" assert set(y) == {-1, 1}, "Target values should be in {-1, 1}" self.classes_ = set(y) J, C, sum_constraint = self.get_hamiltonian(X, y) assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!" assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!" self.set_model(J, C, sum_constraint) sol, response = self.solve() assert len(sol) == C.shape[0], "Inconsistent solution size!" self.params = self.convert_sol_to_params(sol) assert len(self.params) == len(self.h_list), "Inconsistent size!" return response
[docs] def predict_raw(self, X: np.array): """ Predict raw output of the classifier for input X. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Returns ------- y : ndarray of shape (n_samples,) The predicted raw output of the classifier. """ n_records = X.shape[0] n_classifiers = len(self.h_list) y = np.zeros(shape=(n_records), dtype=np.float32) h_vals = np.array( [ self.h_list[i].predict(X[:, self.ind_list[i]]) for i in range(n_classifiers) ] ) y = np.tensordot(self.params, h_vals, axes=(0, 0)) return y
[docs] def predict(self, X: np.array): """ Predict classes for X. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Returns ------- y : ndarray of shape (n_samples,) The predicted classes. """ y = self.predict_raw(X) y = np.sign(y) return y
[docs] @timer def get_hamiltonian( self, X: np.array, y: np.array, ): X = np.array(X, dtype=np.float32) y = np.array(y, dtype=np.float32) if self.weak_cls_strategy == "multi_processing": self._build_weak_classifiers_mp(X, y) elif self.weak_cls_strategy == "multi_processing_shm": self._build_weak_classifiers_shm(X, y) elif self.weak_cls_strategy == "sequential": self._build_weak_classifiers_sq(X, y) print("Built %d weak classifiers!" % len(self.h_list)) n_classifiers = len(self.h_list) n_records = X.shape[0] h_vals = np.array( [ self.h_list[i].predict(X[:, self.ind_list[i]]) for i in range(n_classifiers) ] ) J = np.tensordot(h_vals, h_vals, axes=(1, 1)) J += np.diag(self.lambda_coef * np.ones((n_classifiers))) C = -2.0 * np.tensordot(h_vals, y, axes=(1, 0)) # J, C = get_hamiltonian_pyx(y, h_vals, self.lambda_coef, n_records) C = C.reshape((n_classifiers, 1)) return J, C, 1.0
[docs] def convert_sol_to_params(self, sol): return np.array(sol)