'''
Created on 16 Oct 2015
@author: Max Zwiessele
'''
import numpy as np
from ..model import Model
from ..core.observable_array import ObsAr
from ..param import Param
from ..caching import Cacher
from ..parameterized import Parameterized
[docs]class RidgeRegression(Model):
'''
Ridge regression with regularization.
For any regularization to work we to gradient based optimization.
'''
def __init__(self, X, Y, regularizer=None, basis=None, name='ridge_regression'):
'''
:param array-like X: the inputs X of the regression problem
:param array-like Y: the outputs Y
:param :py:class:`paramz.examples.ridge_regression.Regularizer` regularizer: the regularizer to use
:param str name: the name of this regression object
'''
super(RidgeRegression, self).__init__(name=name)
assert X.ndim == 2, 'inputs need to be at least a column vector'
assert Y.ndim == 2, 'inputs need to be at least a column vector'
self.X = ObsAr(X)
self.Y = ObsAr(Y)
if basis is None:
basis = Polynomial(1)
self.basis = basis
if regularizer is None:
regularizer = Ridge(1)
self.regularizer = regularizer
self.regularizer.init(basis, X.shape[1])
self.link_parameters(self.regularizer, self.basis)
@property
def weights(self):
return self.regularizer.weights
@property
def degree(self):
return self.basis.degree
@property
def _phi(self):
return self.basis._basis
[docs] def phi(self, Xpred, degrees=None):
"""
Compute the design matrix for this model
using the degrees given by the index array
in degrees
:param array-like Xpred: inputs to compute the design matrix for
:param array-like degrees: array of degrees to use [default=range(self.degree+1)]
:returns array-like phi: The design matrix [degree x #samples x #dimensions]
"""
assert Xpred.shape[1] == self.X.shape[1], "Need to predict with same shape as training data."
if degrees is None:
degrees = range(self.basis.degree+1)
tmp_phi = np.empty((len(degrees), Xpred.shape[0], Xpred.shape[1]))
for i, w in enumerate(degrees):
# Objective function
tmpX = self._phi(Xpred, w)
tmp_phi[i] = tmpX * self.weights[[w], :]
return tmp_phi
[docs] def parameters_changed(self):
tmp_outer = 0.
for i in range(self.degree+1):
# Objective function
tmp_X = self._phi(self.X, i)
target_f = tmp_X.dot(self.weights[[i], :].T)
tmp_outer += target_f
tmp_outer = (self.Y-tmp_outer)
for i in range(self.degree+1):
tmp_X = self._phi(self.X, i)
# gradient:
# Note, that we updated the weights gradients
# in the basis first. So here we update
# and add in the gradient for the bound.
self.weights.gradient[i] -= 2*(tmp_outer*tmp_X).sum(0)
self._obj = (((tmp_outer)**2).sum() + self.regularizer.error.sum())
[docs] def objective_function(self):
return self._obj
[docs] def predict(self, Xnew):
tmp_outer = 0.
for i in range(self.degree+1):
# Objective function
tmp_X = self._phi(Xnew, i)
tmp_outer += tmp_X.dot(self.weights[[i], :].T)
return tmp_outer
[docs]class Basis(Parameterized):
def __init__(self, degree, name='basis'):
"""
Basis class for computing the design matrix phi(X). The weights are held
in the regularizer, so that this only represents the design matrix.
"""
super(Basis, self).__init__(name=name)
self.degree = degree
# One way of setting up the caching is by the following, the
# other is by the decorator.
self._basis = Cacher(self.basis, self.degree+1, [], [])
# If we do not set up the caching ourself, using the decorator, we
# cannot set the limit of the cacher easily to the degree+1.
# It could be done on runtime, by inspecting self.cache and setting
# the limit of the right cacher, but that is more complex then necessary
[docs] def basis(self, X, i):
"""
Return the ith basis dimension.
In the polynomial case, this is X**index.
You can write your own basis function here, inheriting from this class
and the gradients will still check.
Note: i will be zero for the first degree. This means we
have also a bias in the model, which makes the problem of having an explicit
bias obsolete.
"""
raise NotImplementedError('Implement the basis you want to optimize over.')
[docs]class Polynomial(Basis):
def __init__(self, degree, name='polynomial'):
super(Polynomial, self).__init__(degree, name)
[docs] def basis(self, X, i):
return X**i
[docs]class Regularizer(Parameterized):
def __init__(self, lambda_, name='regularizer'):
super(Regularizer, self).__init__(name=name)
self.lambda_ = lambda_
self._initialized = False
[docs] def init(self, basis, input_dim):
if self._initialized:
raise RuntimeError("already initialized, please use new object.")
weights = np.ones((basis.degree + 1, input_dim))*np.arange(basis.degree+1)[:,None]
for i in range(basis.degree+1):
weights[[i]] /= basis.basis(weights[[i]], i)
if not isinstance(weights, Param):
if weights.ndim == 1:
weights = weights[:,None]
weights = Param('weights', weights)
else:
assert weights.ndim == 2, 'weights needs to be at least a column vector'
self.weights = weights
self.link_parameter(weights)
self._initialized = True
self.update_error()
[docs] def parameters_changed(self):
if self._initialized:
self.update_error()
[docs] def update_error(self):
raise NotImplementedError('Set the error `error` and gradient of weights in here')
[docs]class Lasso(Regularizer):
def __init__(self, lambda_, name='Lasso'):
super(Lasso, self).__init__(lambda_, name)
[docs] def update_error(self):
self.error = self.lambda_*np.sum(np.abs(self.weights), 1)
self.weights.gradient[:] = self.lambda_*np.sign(self.weights)
[docs]class Ridge(Regularizer):
def __init__(self, lambda_, name='Ridge'):
super(Ridge, self).__init__(lambda_, name)
[docs] def update_error(self):
self.error = self.lambda_*np.sum(self.weights**2, 1)
self.weights.gradient[:] = self.lambda_*2*self.weights