"""These solvers are to be passed as arguments to `~.Solver` objects.
For example:
.. testcode::
from nengo.solvers import LstsqL2
from nengo.utils.least_squares_solvers import SVD
with nengo.Network():
ens_a = nengo.Ensemble(10, 1)
ens_b = nengo.Ensemble(10, 1)
nengo.Connection(ens_a, ens_b, solver=LstsqL2(solver=SVD()))
"""
import numpy as np
import nengo.utils.numpy as npext
from nengo.exceptions import ValidationError
from nengo.params import (
BoolParam,
FrozenObject,
IntParam,
NdarrayParam,
NumberParam,
Parameter,
)
[docs]def rmses(A, X, Y):
"""Returns the root-mean-squared error (RMSE) of the solution X."""
return npext.rms(Y - np.dot(A, X), axis=0)
[docs]class LeastSquaresSolver(FrozenObject):
"""Linear least squares system solver."""
def __call__(self, A, Y, sigma, rng=None):
raise NotImplementedError("LeastSquaresSolver must implement call")
[docs]class Cholesky(LeastSquaresSolver):
"""Solve a least-squares system using the Cholesky decomposition."""
transpose = BoolParam("transpose", optional=True)
def __init__(self, transpose=None):
super().__init__()
self.transpose = transpose
def __call__(self, A, Y, sigma, rng=None):
m, n = A.shape
transpose = self.transpose
if transpose is None:
# transpose if matrix is fat, but not if sigmas for each neuron
transpose = m < n and sigma.size == 1
if transpose:
# substitution: x = A'*xbar, G*xbar = b where G = A*A' + lambda*I
G = np.dot(A, A.T)
b = Y
else:
# multiplication by A': G*x = A'*b where G = A'*A + lambda*I
G = np.dot(A.T, A)
b = np.dot(A.T, Y)
# add L2 regularization term 'lambda' = m * sigma**2
np.fill_diagonal(G, G.diagonal() + m * sigma ** 2)
try:
import scipy.linalg # pylint: disable=import-outside-toplevel
factor = scipy.linalg.cho_factor(G, overwrite_a=True)
X = scipy.linalg.cho_solve(factor, b)
except ImportError:
L = np.linalg.cholesky(G)
L = np.linalg.inv(L.T)
X = np.dot(L, np.dot(L.T, b))
X = np.dot(A.T, X) if transpose else X
info = {"rmses": rmses(A, X, Y)}
return X, info
[docs]class ConjgradScipy(LeastSquaresSolver):
"""Solve a least-squares system using Scipy's conjugate gradient.
Parameters
----------
tol : float
Relative tolerance of the CG solver (see [1]_ for details).
atol : float
Absolute tolerance of the CG solver (see [1]_ for details).
References
----------
.. [1] scipy.sparse.linalg.cg documentation,
https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.linalg.cg.html
"""
tol = NumberParam("tol", low=0)
atol = NumberParam("atol", low=0)
def __init__(self, tol=1e-4, atol=1e-8):
super().__init__()
self.tol = tol
self.atol = atol
def __call__(self, A, Y, sigma, rng=None):
import scipy.sparse.linalg # pylint: disable=import-outside-toplevel
Y, m, n, d, matrix_in = format_system(A, Y)
damp = m * sigma ** 2
calcAA = lambda x: np.dot(A.T, np.dot(A, x)) + damp * x
G = scipy.sparse.linalg.LinearOperator(
(n, n), matvec=calcAA, matmat=calcAA, dtype=A.dtype
)
B = np.dot(A.T, Y)
X = np.zeros((n, d), dtype=B.dtype)
infos = np.zeros(d, dtype="int")
itns = np.zeros(d, dtype="int")
for i in range(d):
# use the callback to count the number of iterations
def callback(x, i=i):
itns[i] += 1
try:
X[:, i], infos[i] = scipy.sparse.linalg.cg(
G, B[:, i], tol=self.tol, callback=callback, atol=self.atol
)
except TypeError as e: # pragma: no cover
# no atol parameter in Scipy < 1.1.0
if "atol" not in str(e):
raise e
X[:, i], infos[i] = scipy.sparse.linalg.cg(
G, B[:, i], tol=self.tol, callback=callback
)
info = {"rmses": rmses(A, X, Y), "iterations": itns, "info": infos}
return X if matrix_in else X.ravel(), info
[docs]class LSMRScipy(LeastSquaresSolver):
"""Solve a least-squares system using Scipy's LSMR."""
tol = NumberParam("tol", low=0)
def __init__(self, tol=1e-4):
super().__init__()
self.tol = tol
def __call__(self, A, Y, sigma, rng=None):
import scipy.sparse.linalg # pylint: disable=import-outside-toplevel
Y, m, n, d, matrix_in = format_system(A, Y)
damp = sigma * np.sqrt(m)
X = np.zeros((n, d), dtype=Y.dtype)
itns = np.zeros(d, dtype="int")
for i in range(d):
X[:, i], _, itns[i], _, _, _, _, _ = scipy.sparse.linalg.lsmr(
A, Y[:, i], damp=damp, atol=self.tol, btol=self.tol
)
info = {"rmses": rmses(A, X, Y), "iterations": itns}
return X if matrix_in else X.ravel(), info
[docs]class Conjgrad(LeastSquaresSolver):
"""Solve a least-squares system using conjugate gradient."""
tol = NumberParam("tol", low=0)
maxiters = IntParam("maxiters", low=1, optional=True)
X0 = NdarrayParam("X0", shape=("*", "*"), optional=True)
def __init__(self, tol=1e-2, maxiters=None, X0=None):
super().__init__()
self.tol = tol
self.maxiters = maxiters
self.X0 = X0
def __call__(self, A, Y, sigma, rng=None):
Y, m, n, d, matrix_in = format_system(A, Y)
X = np.zeros((n, d)) if self.X0 is None else np.array(self.X0)
if X.shape != (n, d):
raise ValidationError(
"Must be shape %s, got %s" % ((n, d), X.shape), attr="X0", obj=self
)
damp = m * sigma ** 2
rtol = self.tol * np.sqrt(m)
G = lambda x: np.dot(A.T, np.dot(A, x)) + damp * x
B = np.dot(A.T, Y)
iters = -np.ones(d, dtype="int")
for i in range(d):
X[:, i], iters[i] = self._conjgrad_iters(
G, B[:, i], X[:, i], maxiters=self.maxiters, rtol=rtol
)
info = {"rmses": rmses(A, X, Y), "iterations": iters}
return X if matrix_in else X.ravel(), info
@staticmethod
def _conjgrad_iters(calcAx, b, x, maxiters=None, rtol=1e-6):
"""Solve a single-RHS linear system using conjugate gradient."""
if maxiters is None:
maxiters = b.shape[0]
r = b - calcAx(x)
p = r.copy()
rsold = np.dot(r, r)
for i in range(maxiters):
Ap = calcAx(p)
alpha = rsold / np.dot(p, Ap)
x += alpha * p
r -= alpha * Ap
rsnew = np.dot(r, r)
beta = rsnew / rsold
if np.sqrt(rsnew) < rtol:
break
if beta < 1e-12: # no perceptible change in p
break
# p = r + beta*p
p *= beta
p += r
rsold = rsnew
return x, i + 1
[docs]class BlockConjgrad(LeastSquaresSolver):
"""Solve a multiple-RHS least-squares system using block conj. gradient."""
tol = NumberParam("tol", low=0)
X0 = NdarrayParam("X0", shape=("*", "*"), optional=True)
def __init__(self, tol=1e-2, X0=None):
super().__init__()
self.tol = tol
self.X0 = X0
def __call__(self, A, Y, sigma, rng=None):
Y, m, n, d, matrix_in = format_system(A, Y)
sigma = np.asarray(sigma, dtype="float")
sigma = sigma.reshape(sigma.size, 1)
X = np.zeros((n, d)) if self.X0 is None else np.array(self.X0)
if X.shape != (n, d):
raise ValidationError(
"Must be shape %s, got %s" % ((n, d), X.shape), attr="X0", obj=self
)
damp = m * sigma ** 2
rtol = self.tol * np.sqrt(m)
G = lambda x: np.dot(A.T, np.dot(A, x)) + damp * x
B = np.dot(A.T, Y)
# --- conjugate gradient
R = B - G(X)
P = np.array(R)
Rsold = np.dot(R.T, R)
AP = np.zeros((n, d))
maxiters = int(n / d)
for i in range(maxiters):
AP = G(P)
alpha = np.linalg.solve(np.dot(P.T, AP), Rsold)
X += np.dot(P, alpha)
R -= np.dot(AP, alpha)
Rsnew = np.dot(R.T, R)
if (np.diag(Rsnew) < rtol ** 2).all():
break
beta = np.linalg.solve(Rsold, Rsnew)
P = R + np.dot(P, beta)
Rsold = Rsnew
info = {"rmses": rmses(A, X, Y), "iterations": i + 1}
return X if matrix_in else X.ravel(), info
[docs]class SVD(LeastSquaresSolver):
"""Solve a least-squares system using full SVD."""
def __call__(self, A, Y, sigma, rng=None):
Y, m, _, _, matrix_in = format_system(A, Y)
U, s, V = np.linalg.svd(A, full_matrices=0)
si = s / (s ** 2 + m * sigma ** 2)
X = np.dot(V.T, si[:, None] * np.dot(U.T, Y))
info = {"rmses": npext.rms(Y - np.dot(A, X), axis=0)}
return X if matrix_in else X.ravel(), info
[docs]class RandomizedSVD(LeastSquaresSolver):
"""Solve a least-squares system using a randomized (partial) SVD.
Useful for solving large matrices quickly, but non-optimally.
Parameters
----------
n_components : int, optional
The number of SVD components to compute. A small survey of activity
matrices suggests that the first 60 components capture almost all
the variance.
n_oversamples : int, optional
The number of additional samples on the range of A.
n_iter : int, optional
The number of power iterations to perform (can help with noisy data).
See also
--------
sklearn.utils.extmath.randomized_svd : Function used by this class
"""
n_components = IntParam("n_components", low=1)
n_oversamples = IntParam("n_oversamples", low=0)
n_iter = IntParam("n_iter", low=0)
def __init__(self, n_components=60, n_oversamples=10, n_iter=0):
from sklearn.utils.extmath import ( # pylint: disable=import-outside-toplevel
randomized_svd,
)
assert randomized_svd
super().__init__()
self.n_components = n_components
self.n_oversamples = n_oversamples
self.n_iter = n_iter
def __call__(self, A, Y, sigma, rng=np.random):
from sklearn.utils.extmath import ( # pylint: disable=import-outside-toplevel
randomized_svd,
)
Y, m, n, _, matrix_in = format_system(A, Y)
if min(m, n) <= self.n_components + self.n_oversamples:
# more efficient to do a full SVD
return SVD()(A, Y, sigma, rng=rng)
U, s, V = randomized_svd(
A,
self.n_components,
n_oversamples=self.n_oversamples,
n_iter=self.n_iter,
random_state=rng,
)
si = s / (s ** 2 + m * sigma ** 2)
X = np.dot(V.T, si[:, None] * np.dot(U.T, Y))
info = {"rmses": npext.rms(Y - np.dot(A, X), axis=0)}
return X if matrix_in else X.ravel(), info
[docs]class LeastSquaresSolverParam(Parameter):
def coerce(self, instance, solver):
self.check_type(instance, solver, LeastSquaresSolver)
return super().coerce(instance, solver)