ass2.ipyn
{
"cells": [
{
"cell_type": "code",
"execution_count": 48,
"metadata": {
"id": "ZfXIWZ57NkON"
},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import os\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {
"id": "nMdVQfNePLQJ"
},
"outputs": [],
"source": [
"class Elastic:\n",
" def __init__(self, preds, responses, lambdas, alphas, nf = 5, iters = 1000, to_ve
= False):\n",
"\n",
" self.x, self.y = self._shuffle_data(preds, responses)\n",
" self.lambdas = lambdas \n",
" self.alphas = alphas\n",
" self.nf = nf \n",
" self.iters = iters\n",
" self.to_ve
= to_ve
\n",
" self._initialize()\n",
"\n",
"\n",
" def _initialize(self):\n",
" '''\n",
" This function initializes certain values needed for training and checks that certain\n",
" arguments have appropriate values. \n",
" '''\n",
"\n",
" # get number of samples and number of features\n",
" self.n_samples = self.x.shape[0]\n",
" self.n_preds = self.x.shape[1]\n",
" \n",
" # matrix to store the cross validation results \n",
" self.cv_vals = np.zeros([self.nf, len(self.lambdas), len(self.alphas)])\n",
"\n",
" # determine the number of validation samples and their inds based on nf \n",
" self.n_val_samples = self.n_samples
self.nf \n",
" self.val_inds = list(range(0, self.n_samples, self.n_val_samples))\n",
" \n",
" if self.to_ve
:\n",
" print('[INFO] Using {} training and {} validation samples for each CV fold.'.format(\n",
" self.n_samples - self.n_val_samples, self.n_val_samples)\n",
" )\n",
"\n",
" # create a tensor to store the Betas \n",
" self.B_trained = np.zeros([self.nf, len(self.lambdas), len(self.alphas), self.n_preds])\n",
"\n",
" def _standardize(self, x, mean_vec, std_vec):\n",
"\n",
"\n",
" return (x - mean_vec) / std_vec \n",
"\n",
" def _center_responses(self, y, mean):\n",
"\n",
"\n",
" return y - mean\n",
"\n",
" def _shuffle_data(self, preds, responses):\n",
"\n",
"\n",
" data = np.concatenate((preds, responses[:, None]), 1)\n",
" np.random.shuffle(data)\n",
" return data[:, :-1], data[:, -1]\n",
"\n",
" def _initialize_B(self):\n",
"\n",
"\n",
" return np.random.uniform(low = -1, high = 1, size = (self.n_preds, 1))\n",
"\n",
" def predict(self, x):\n",
"\n",
"\n",
" assert(self.mean_vec is not None and self.std_vec is not None), \\\n",
" 'Model must be trained before predicting.'\n",
"\n",
" x = self._standardize(x, self.mean_vec, self.std_vec)\n",
" return np.matmul(x, self.B)\n",
"\n",
" def _get_folds(self, val_ind):\n",
" \n",
"\n",
" x_val = self.x[val_ind:val_ind + self.n_val_samples]\n",
" x_train = np.delete(self.x, np.arange(val_ind, val_ind + self.n_val_samples), axis = 0)\n",
" y_val = self.y[val_ind:val_ind + self.n_val_samples]\n",
" y_train = np.delete(self.y, np.arange(val_ind, val_ind + self.n_val_samples), axis = 0)\n",
" return x_train, x_val, y_train, y_val\n",
"\n",
" def _update(self, x, y, B, ss_cols, lambda_, alpha):\n",
"\n",
" \n",
" for k in range(self.n_preds):\n",
" # get rss without the effect of coefficient k\n",
" rss_k = y - np.matmul(x, B) + (x[:, k] * B[k])[:, None]\n",
" \n",
" # a_k is part of the derivative of the rss term in the loss function\n",
" a_k = np.matmul(x[:, k].transpose(), rss_k)[0]\n",
" \n",
" # update B_k\n",
" B_k = np.absolute(a_k) - lambda_ * (1 - alpha) / 2\n",
" B_k = B_k if B_k >= 0 else 0\n",
" B[k, 0] = np.sign(a_k) * B_k / (ss_cols[k] + lambda_ * alpha)\n",
" \n",
" return B\n",
"\n",
" def score(self, x, y, B):\n",
"\n",
"\n",
" y_hat = np.matmul(x, B)\n",
" mse = np.mean((y - y_hat) ** 2)\n",
" return mse\n",
" \n",
" def _compute_ss_cols(self, x):\n",
" '''\n",
" Computes the sum of squares for each column needed in the update.\n",
" Only need to do this once per fold at the beginning. \n",
" \n",
" Args:\n",
" x (np.nda
ay): The N x M design matrix of the cu
ent fold. \n",
" \n",
" Returns:\n",
" ss_cols (np.nda
ay): The M-dim vector representing the sum\n",
" of squares of the cols of x.\n",
" '''\n",
" return np.sum(x ** 2, 0)\n",
" \n",
" def _find_best_tuning_params(self):\n",
" '''\n",
" Finds the value of lambda and alpha with minimum co
esponding CV score and\n",
" saves them as class attributes. \n",
" '''\n",
" cv_mean = np.mean(self.cv_vals, 0)\n",
" best_lambda_ind, best_alpha_ind = np.where(cv_mean == np.amin(cv_mean))\n",
" self.best_lambda = self.lambdas[best_lambda_ind[0]]\n",
" self.best_alpha = self.alphas[best_alpha_ind[0]]\n",
"\n",
" def fit(self):\n",
" '''\n",
" Implements the cross validation and retrains the model with the optimal lambda \n",
" and alpha on the entire dataset.\n",
" '''\n",
"\n",
" for i_lambda, lambda_ in enumerate(self.lambdas): # loop through lambdas\n",
" for i_alpha, alpha in enumerate(self.alphas): # loop through alphas\n",
" for i_fold, val_ind in zip(range(self.nf), self.val_inds): # loop through folds\n",
" # get the folds\n",
" x_train, x_val, y_train, y_val = self._get_folds(val_ind)\n",
"\n",
" # standardize x and center y\n",
" mean_vec, std_vec = np.mean(x_train, 0), np.std(x_train, 0)\n",
" mean_response = np.mean(y_train)\n",
" x_train = self._standardize(x_train, mean_vec, std_vec)\n",
" x_val = self._standardize(x_val, mean_vec, std_vec)\n",
" y_train = self._center_responses(y_train, mean_response)[:, None]\n",
" y_val = self._center_responses(y_val, mean_response)[:, None]\n",
" \n",
" # compute b_k given this fold -- don't need to compute every update\n",
" ss_cols = self._compute_ss_cols(x_train)\n",
"\n",
" # initialize Beta for this lambda and fold\n",
" B = self._initialize_B()\n",
"\n",
" for iter in range(self.iters):\n",
" B = self._update(x_train, y_train, B, ss_cols, lambda_, alpha)\n",
" \n",
" # score this model \n",
" score = self.score(x_val, y_val, B)\n",
"\n",
" # store the score with the tuning param combinations\n",
" self.cv_vals[i_fold, i_lambda, i_alpha] = score\n",
"\n",
" # store the coefficient vector\n",
" self.B_trained[i_fold, i_lambda, i_alpha] = B[:, 0]\n",
" \n",
" # if to_ve
flag, then print out the mean CV MSE for the combo of lambda and alpha\n",
" if self.to_ve
:\n",
" print('lambda:{}; alpha:{}; CV MSE:{}'.format(\n",
" lambda_, alpha, np.mean(self.cv_vals[:, i_lambda, i_alpha]))\n",
" )\n",
"\n",
"\n",
" ############# Retrain on entire dataset with optimal lambda and alpha #############\n",
" \n",
" # find the best lambda and alpha\n",
" self._find_best_tuning_params()\n",
" \n",
" # standardize features of x and center responses \n",
" self.mean_vec, self.std_vec = np.mean(self.x, 0), np.std(self.x, 0)\n",
" x = self._standardize(self.x, self.mean_vec, self.std_vec)\n",
" y = self._center_responses(self.y, np.mean(self.y))[:, None]\n",
" \n",
" # compute the sum of squares for each feature on the entire dataset\n",
" ss_cols = self._compute_ss_cols(x)\n",
" \n",
" # initialize coefficients\n",
" self.B = self._initialize_B()\n",
" \n",
" # perform updates \n",
" for iter in range(self.iters):\n",
" self.B = self._update(x, y, self.B, ss_cols, self.best_lambda, self.best_alpha)"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {
"colab": {
"base_uri": "https:
localhost:8080/",
"height": 419
},
"id": "2yKcu-xHOwnO",
"outputId": "6b6549d7-3105-4600-c9a1-1
42d9f335c"
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"