# (C) Quantum Computing Inc., 2024.
# Import libs
import os
import sys
import time
import datetime
import json
import gc
import warnings
from functools import wraps
from multiprocessing import shared_memory, Pool, set_start_method, Manager
from multiprocessing.managers import SharedMemoryManager
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from eqc_models.ml.classifierbase import ClassifierBase
#from eqc_models.ml.cvqboost_hamiltonian import get_hamiltonian_pyx
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
beg_time = time.time()
val = func(*args, **kwargs)
end_time = time.time()
tot_time = end_time - beg_time
print(
"Runtime of %s: %0.2f seconds!"
% (
func.__name__,
tot_time,
)
)
return val
return wrapper
class WeakClassifier:
def __init__(
self,
X_train,
y_train,
weak_cls_type,
max_depth=10,
min_samples_split=100,
num_jobs=1,
):
assert X_train.shape[0] == len(y_train)
self.X_train = X_train
self.y_train = y_train
if weak_cls_type == "dct":
self.clf = DecisionTreeClassifier(
max_depth=max_depth,
min_samples_split=min_samples_split,
random_state=0,
)
elif weak_cls_type == "nb":
self.clf = GaussianNB()
elif weak_cls_type == "lg":
self.clf = LogisticRegression(random_state=0)
elif weak_cls_type == "gp":
self.clf = GaussianProcessClassifier(
kernel=1.0 * RBF(1.0),
random_state=0,
)
else:
assert False, (
"Unknown weak classifier type <%s>!" % weak_cls_type
)
def train(self):
self.clf.fit(self.X_train, self.y_train)
def predict(self, X):
return self.clf.predict(X)
[docs]
class QBoostClassifier(ClassifierBase):
"""An implementation of QBoost classifier that uses QCi's Dirac-3.
Parameters
----------
relaxation_schedule: Relaxation schedule used by Dirac-3;
default: 2.
num_samples: Number of samples used by Dirac-3; default: 1.
lambda_coef: A penalty multiplier; default: 0.
weak_cls_schedule: Weak classifier schedule. Is either 1, 2,
or 3; default: 2.
weak_cls_type: Type of weak classifier
- dct: Decison tree classifier
- nb: Naive Baysian classifier
- lg: Logistic regression
- gp: Gaussian process classifier
default: dct.
weak_max_depth: Max depth of the tree. Applied only when
weak_cls_type="dct". Default: 10.
weak_min_samples_split: The minimum number of samples required
to split an internal node. Applied only when
weak_cls_type="dct". Default: 100.
Examples
-----------
>>> from sklearn import datasets
>>> from sklearn.preprocessing import MinMaxScaler
>>> from sklearn.model_selection import train_test_split
>>> iris = datasets.load_iris()
>>> X = iris.data
>>> y = iris.target
>>> scaler = MinMaxScaler()
>>> X = scaler.fit_transform(X)
>>> for i in range(len(y)):
... if y[i] == 0:
... y[i] = -1
... elif y[i] == 2:
... y[i] = 1
>>> X_train, X_test, y_train, y_test = train_test_split(
... X,
... y,
... test_size=0.2,
... random_state=42,
... )
>>> from eqc_models.ml.classifierqboost import QBoostClassifier
>>> obj = QBoostClassifier(
... relaxation_schedule=2,
... num_samples=1,
... lambda_coef=0.0,
... )
>>> from contextlib import redirect_stdout
>>> import io
>>> f = io.StringIO()
>>> with redirect_stdout(f):
... obj.fit(X_train, y_train)
... y_train_prd = obj.predict(X_train)
... y_test_prd = obj.predict(X_test)
"""
def __init__(
self,
relaxation_schedule=2,
num_samples=1,
lambda_coef=0,
weak_cls_schedule=2,
weak_cls_type="lg",
weak_max_depth=10,
weak_min_samples_split=100,
weak_cls_strategy="multi_processing",
weak_cls_num_jobs=None,
):
super(QBoostClassifier).__init__()
assert weak_cls_schedule in [1, 2, 3]
assert weak_cls_type in ["dct", "nb", "lg", "gp"]
assert weak_cls_strategy in [
"multi_processing",
"multi_processing_shm",
"sequential",
]
self.relaxation_schedule = relaxation_schedule
self.num_samples = num_samples
self.lambda_coef = lambda_coef
self.weak_cls_schedule = weak_cls_schedule
self.weak_cls_type = weak_cls_type
self.weak_max_depth = weak_max_depth
self.weak_min_samples_split = weak_min_samples_split
self.weak_cls_strategy = weak_cls_strategy
if weak_cls_num_jobs is None or weak_cls_num_jobs <= 0:
self.weak_cls_num_jobs = os.cpu_count()
else:
self.weak_cls_num_jobs = int(weak_cls_num_jobs)
self.h_list = []
self.ind_list = []
self.classes_ = None
@timer
def _build_weak_classifiers_sq(self, X, y):
n_records = X.shape[0]
n_dims = X.shape[1]
assert len(y) == n_records
self.h_list = []
self.ind_list = []
num_workers = self.weak_cls_num_jobs
tasks = []
for l in range(n_dims):
weak_classifier = WeakClassifier(
X[:, [l]],
y,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
weak_classifier.train()
self.ind_list.append([l])
self.h_list.append(weak_classifier)
if self.weak_cls_schedule >= 2:
for i in range(n_dims):
for j in range(i + 1, n_dims):
weak_classifier = WeakClassifier(
X[:, [i, j]],
y,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
weak_classifier.train()
self.ind_list.append([i, j])
self.h_list.append(weak_classifier)
if self.weak_cls_schedule >= 3:
for i in range(n_dims):
for j in range(i + 1, n_dims):
for k in range(j + 1, n_dims):
weak_classifier = WeakClassifier(
X[:, [i, j, k]],
y,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
weak_classifier.train()
self.ind_list.append([i, j, k])
self.h_list.append(weak_classifier)
return
def _train_weak_classifier_mp(
self,
indices,
X_subset,
y,
n_records,
n_dims,
weak_cls_type,
weak_max_depth,
weak_min_samples_split,
):
# Train the weak classifier
weak_classifier = WeakClassifier(
X_subset,
y,
weak_cls_type,
weak_max_depth,
weak_min_samples_split,
)
weak_classifier.train()
return indices, weak_classifier
@timer
def _build_weak_classifiers_mp(self, X, y):
n_records = X.shape[0]
n_dims = X.shape[1]
assert len(y) == n_records
self.h_list = []
self.ind_list = []
num_workers = self.weak_cls_num_jobs
print(f"Using {num_workers} workers to build weak classifiers.")
set_start_method("fork", force=True)
tasks = []
for l in range(n_dims):
tasks.append(
(
[l],
X[:, [l]],
y,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
if self.weak_cls_schedule >= 2:
for i in range(n_dims):
for j in range(i + 1, n_dims):
tasks.append(
(
[i, j],
X[:, [i, j]],
y,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
if self.weak_cls_schedule >= 3:
for i in range(n_dims):
for j in range(i + 1, n_dims):
for k in range(j + 1, n_dims):
tasks.append(
(
[i, j, k],
X[:, [i, j, k]],
y,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
# Parallel execution using Pool
with Pool(processes=num_workers) as pool:
results = pool.starmap(self._train_weak_classifier_mp, tasks)
pool.join()
pool.close()
for indices, weak_classifier in results:
self.ind_list.append(indices)
self.h_list.append(weak_classifier)
return
def _train_weak_classifier_shm(
self,
indices,
shm_X_name,
shm_y_name,
shared_list,
n_records,
n_dims,
weak_cls_type,
weak_max_depth,
weak_min_samples_split,
):
"""Train a weak classifier using shared memory."""
shm_X_worker = shared_memory.SharedMemory(name=shm_X_name)
shm_y_worker = shared_memory.SharedMemory(name=shm_y_name)
X_shared = np.ndarray(
(n_records, n_dims), dtype=np.float32, buffer=shm_X_worker.buf
)
y_shared = np.ndarray(
(n_records,), dtype=np.float32, buffer=shm_y_worker.buf
)
X_subset = X_shared[:, indices]
weak_classifier = WeakClassifier(
X_subset,
y_shared,
weak_cls_type,
weak_max_depth,
weak_min_samples_split,
)
weak_classifier.train()
shared_list.append((indices, weak_classifier))
shm_X_worker.close()
shm_y_worker.close()
@timer
def _build_weak_classifiers_shm(self, X, y):
n_records = X.shape[0]
n_dims = X.shape[1]
assert len(y) == n_records
self.h_list = []
self.ind_list = []
num_workers = self.weak_cls_num_jobs
print(f"Using {num_workers} workers to build weak classifiers.")
set_start_method("fork", force=True)
X = np.ascontiguousarray(X, dtype=np.float32)
y = np.ascontiguousarray(y, dtype=np.float32)
with SharedMemoryManager() as shm_manager:
shm_X = shm_manager.SharedMemory(size=X.nbytes)
shm_y = shm_manager.SharedMemory(size=y.nbytes)
X_shared = np.ndarray(X.shape, dtype=X.dtype, buffer=shm_X.buf)
y_shared = np.ndarray(y.shape, dtype=y.dtype, buffer=shm_y.buf)
np.copyto(X_shared, X)
np.copyto(y_shared, y)
with Manager() as manager:
shared_list = manager.list()
tasks = []
for l in range(n_dims):
tasks.append(
(
[l],
shm_X.name,
shm_y.name,
shared_list,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
if self.weak_cls_schedule >= 2:
for i in range(n_dims):
for j in range(i + 1, n_dims):
tasks.append(
(
[i, j],
shm_X.name,
shm_y.name,
shared_list,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
if self.weak_cls_schedule >= 3:
for i in range(n_dims):
for j in range(i + 1, n_dims):
for k in range(j + 1, n_dims):
tasks.append(
(
[i, j, k],
shm_X.name,
shm_y.name,
shared_list,
n_records,
n_dims,
self.weak_cls_type,
self.weak_max_depth,
self.weak_min_samples_split,
)
)
with Pool(processes=num_workers) as pool:
results = pool.starmap(
self._train_weak_classifier_shm, tasks
)
pool.close()
pool.join()
for item in list(shared_list):
self.ind_list.append(item[0])
self.h_list.append(item[1])
shm_X.close()
shm_X.unlink()
shm_y.close()
shm_y.unlink()
def _infer_one_weak_classifier(self, cls_ind, X_subset):
return self.h_list[cls_ind].predict(X_subset)
def _infer_weak_classifiers(self, X):
n_classifiers = len(self.h_list)
num_workers = self.weak_cls_num_jobs
print(f"Using {num_workers} workers for inference.")
set_start_method("fork", force=True)
tasks = []
for i in range(n_classifiers):
tasks.append((i, X[:, self.ind_list[i]]))
with Pool(processes=num_workers) as pool:
results = pool.starmap(self._infer_one_weak_classifier, tasks)
return list(results)
[docs]
def fit(self, X, y):
"""
Build a QBoost classifier from the training set (X, y).
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
The training input samples.
y : array-like of shape (n_samples,)
The target values.
Returns
-------
Response of Dirac-3 in JSON format.
"""
assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
self.classes_ = set(y)
J, C, sum_constraint = self.get_hamiltonian(X, y)
assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
self.set_model(J, C, sum_constraint)
sol, response = self.solve()
assert len(sol) == C.shape[0], "Inconsistent solution size!"
self.params = self.convert_sol_to_params(sol)
assert len(self.params) == len(self.h_list), "Inconsistent size!"
return response
[docs]
def predict_raw(self, X: np.array):
"""
Predict raw output of the classifier for input X.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Returns
-------
y : ndarray of shape (n_samples,)
The predicted raw output of the classifier.
"""
n_records = X.shape[0]
n_classifiers = len(self.h_list)
y = np.zeros(shape=(n_records), dtype=np.float32)
h_vals = np.array(
[
self.h_list[i].predict(X[:, self.ind_list[i]])
for i in range(n_classifiers)
]
)
y = np.tensordot(self.params, h_vals, axes=(0, 0))
return y
[docs]
def predict(self, X: np.array):
"""
Predict classes for X.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Returns
-------
y : ndarray of shape (n_samples,)
The predicted classes.
"""
y = self.predict_raw(X)
y = np.sign(y)
return y
[docs]
@timer
def get_hamiltonian(
self,
X: np.array,
y: np.array,
):
X = np.array(X, dtype=np.float32)
y = np.array(y, dtype=np.float32)
if self.weak_cls_strategy == "multi_processing":
self._build_weak_classifiers_mp(X, y)
elif self.weak_cls_strategy == "multi_processing_shm":
self._build_weak_classifiers_shm(X, y)
elif self.weak_cls_strategy == "sequential":
self._build_weak_classifiers_sq(X, y)
print("Built %d weak classifiers!" % len(self.h_list))
n_classifiers = len(self.h_list)
n_records = X.shape[0]
h_vals = np.array(
[
self.h_list[i].predict(X[:, self.ind_list[i]])
for i in range(n_classifiers)
]
)
J = np.tensordot(h_vals, h_vals, axes=(1, 1))
J += np.diag(self.lambda_coef * np.ones((n_classifiers)))
C = -2.0 * np.tensordot(h_vals, y, axes=(1, 0))
# J, C = get_hamiltonian_pyx(y, h_vals, self.lambda_coef, n_records)
C = C.reshape((n_classifiers, 1))
return J, C, 1.0
[docs]
def convert_sol_to_params(self, sol):
return np.array(sol)