Source code for spear.cage.core

import torch
from torch import optim
import pickle
from os import path as check_path
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

from ..utils.data_editor import get_data, get_classes, get_predictions
from ..utils.utils_cage import probability, log_likelihood_loss, precision_loss, predict_gm_labels

[docs]class Cage: ''' Cage class: Class for Data Programming using CAGE [Note: from here on, graphical model(gm) and CAGE algorithm terms are used interchangeably] Args: path_json: Path to json file consisting of number to string(class name) map n_lfs: number of labelling functions used to generate pickle files ''' def __init__(self, path_json, n_lfs): assert type(path_json) == str assert type(n_lfs) == np.int or type(n_lfs) == np.float use_cuda = torch.cuda.is_available() self.device = torch.device("cuda" if use_cuda else "cpu") torch.backends.cudnn.benchmark = True self.class_dict = get_classes(path_json) self.class_list = list((self.class_dict).keys()) self.class_list.sort() self.n_classes = len(self.class_dict) self.class_map = {value: index for index, value in enumerate(self.class_list)} self.class_map[None] = self.n_classes self.n_lfs = int(n_lfs) self.n, self.k = None, None #continuous_mask, labels of LFs self.pi = torch.ones((self.n_classes, self.n_lfs), device = self.device).double() (self.pi).requires_grad = True self.theta = torch.ones((self.n_classes, self.n_lfs), device = self.device).double() (self.theta).requires_grad = True
[docs] def save_params(self, save_path): ''' member function to save parameters of Cage Args: save_path: path to pickle file to save parameters ''' file_ = open(save_path, 'wb') pickle.dump(self.theta, file_) pickle.dump(self.pi, file_) pickle.dump(self.n_classes, file_) pickle.dump(self.n_lfs, file_) file_.close() return
[docs] def load_params(self, load_path): ''' member function to load parameters to Cage Args: load_path: path to pickle file to load parameters ''' assert check_path.exists(load_path) file_ = open(load_path, 'rb') self.theta = pickle.load(file_) self.pi = pickle.load(file_) assert self.n_classes == pickle.load(file_) assert self.n_lfs == pickle.load(file_) file_.close() assert (self.pi).shape == (self.n_classes, self.n_lfs) assert (self.theta).shape == (self.n_classes, self.n_lfs) return
[docs] def fit_and_predict_proba(self, path_pkl, path_test = None, path_log = None, qt = 0.9, qc = 0.85, metric_avg = ['binary'], n_epochs = 100, lr = 0.01): ''' Args: path_pkl: Path to pickle file of input data in standard format path_test: Path to the pickle file containing test data in standard format path_log: Path to log file. No log is produced if path_test is None. Default is None which prints accuracies/f1_scores is printed to terminal qt: Quality guide of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.9 qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 metric_avg: List of average metric to be used in calculating f1_score, default is ['binary']. Use None for not calculating f1_score n_epochs:Number of epochs, default is 100 lr: Learning rate for torch.optim, default is 0.01 Return: numpy.ndarray of shape (num_instances, num_classes) where i,j-th element is the probability of ith instance being the jth class(the jth value when sorted in ascending order of values in Enum) ''' assert type(path_pkl) == str assert (type(qt) == np.float and (qt >= 0 and qt <= 1)) or (type(qt) == np.ndarray and (np.all(np.logical_and(qt>=0, qt<=1)) ) )\ or (type(qt) == np.int and (qt == 0 or qt == 1)) assert (type(qc) == np.float and (qc >= 0 and qc <= 1)) or (type(qc) == np.ndarray and (np.all(np.logical_and(qc>=0, qc<=1)) ) )\ or (type(qc) == np.int and (qc == 0 or qc == 1)) for temp in metric_avg: assert temp in ['micro', 'macro', 'samples','weighted', 'binary'] or metric_avg is None assert type(n_epochs) == np.int or type(n_epochs) == np.float assert type(lr) == np.int or type(lr) == np.float data = get_data(path_pkl, True, self.class_map) m = torch.abs(torch.tensor(data[2], device = self.device).long()) s = torch.tensor(data[6], device = self.device).double() # continuous score if self.n == None: self.n = torch.tensor(data[7], device = self.device).double() # Mask for s/continuous_mask else: assert torch.all(torch.tensor(data[7], device = self.device).double().eq(self.n)) if self.k == None: self.k = torch.tensor(data[8], device = self.device).long() # LF's classes else: assert torch.all(torch.tensor(data[8], device = self.device).long().eq(self.k)) s[s > 0.999] = 0.999 # clip s s[s < 0.001] = 0.001 # clip s assert self.n_lfs == m.shape[1] assert self.n_classes == data[9] qt_ = torch.tensor(qt, device = self.device).double() if type(qt) == np.ndarray else (torch.ones(self.n_lfs, device = self.device).double() * qt) qc_ = torch.tensor(qc, device = self.device).double() if type(qc) == np.ndarray else qc metric_avg_ = list(set(metric_avg)) n_epochs_ = int(n_epochs) optimizer = optim.Adam([self.theta, self.pi], lr=lr, weight_decay=0) file = None if path_test != None and path_log != None: file = open(path_log, "a+") file.write("CAGE log:\tn_classes: {}\tn_LFs: {}\tn_epochs: {}\tlr: {}\n".format(self.n_classes, self.n_lfs, n_epochs, lr)) elif path_test != None: print("CAGE log:\tn_classes: {}\tn_LFs: {}\tn_epochs: {}\tlr: {}".format(self.n_classes, self.n_lfs, n_epochs, lr)) y_true_test = None s_test, m_test = None, None if path_test != None: data = get_data(path_test, True, self.class_map) m_test, y_true_test, s_test = data[2], data[3], data[6] assert m_test.shape[0] == y_true_test.shape[0] y_true_test = y_true_test.flatten() assert self.n_lfs == m_test.shape[1] assert self.n_classes == data[9] assert torch.all(torch.tensor(data[7], device = self.device).double().eq(self.n)) assert torch.all(torch.tensor(data[8], device = self.device).long().eq(self.k)) assert np.all(np.logical_and(y_true_test >= 0, y_true_test < self.n_classes)) with tqdm(total=n_epochs_) as pbar: for epoch in range(n_epochs_): optimizer.zero_grad() loss = log_likelihood_loss(self.theta, self.pi, m, s, self.k, self.n_classes, self.n, qc_, self.device) prec_loss = precision_loss(self.theta, self.k, self.n_classes, qt_, self.device) loss += prec_loss if path_test != None: y_pred = self.__predict_specific(m_test, s_test, qc_) if path_log != None: file.write("Epoch: {}\ttest_accuracy_score: {}\n".format(epoch, accuracy_score(y_true_test, y_pred))) else: print("Epoch: {}\ttest_accuracy_score: {}".format(epoch, accuracy_score(y_true_test, y_pred))) if epoch == n_epochs_-1: print("final_test_accuracy_score: {}".format(accuracy_score(y_true_test, y_pred))) for temp in metric_avg_: if path_log != None: file.write("Epoch: {}\ttest_average_metric: {}\ttest_f1_score: {}\n".format(epoch, temp, f1_score(y_true_test, y_pred, average = temp))) else: print("Epoch: {}\ttest_average_metric: {}\ttest_f1_score: {}".format(epoch, temp, f1_score(y_true_test, y_pred, average = temp))) if epoch == n_epochs_-1: print("test_average_metric: {}\tfinal_test_f1_score: {}".format(temp, f1_score(y_true_test, y_pred, average = temp))) loss.backward() optimizer.step() pbar.update() if path_test != None and path_log != None: file.close() return (probability(self.theta, self.pi, m, s, self.k, self.n_classes, self.n, qc_, self.device)).cpu().detach().numpy()
[docs] def fit_and_predict(self, path_pkl, path_test = None, path_log = None, qt = 0.9, qc = 0.85, metric_avg = ['binary'], n_epochs = 100, lr = 0.01, need_strings = False): ''' Args: path_pkl: Path to pickle file of input data in standard format path_test: Path to the pickle file containing test data in standard format path_log: Path to log file. No log is produced if path_test is None. Default is None which prints accuracies/f1_scores is printed to terminal qt: Quality guide of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.9 qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 metric_avg: List of average metric to be used in calculating f1_score, default is ['binary'] n_epochs:Number of epochs, default is 100 lr: Learning rate for torch.optim, default is 0.01 need_strings: If True, the output will be in the form of strings(class names). Else it is in the form of class values(given to classes in Enum). Default is False Return: numpy.ndarray of shape (num_instances,) which are aggregated/predicted labels. Elements are numbers/strings depending on need_strings attribute is false/true resp. ''' assert type(need_strings) == np.bool proba = self.fit_and_predict_proba(path_pkl, path_test, path_log, qt, qc, metric_avg, n_epochs, lr) return get_predictions(proba, self.class_map, self.class_dict, need_strings)
def __predict_specific(self, m_test, s_test, qc_): ''' Used to predict labels based on s_test and m_test Args: m_test: numpy arrays of shape (num_instances, num_rules), m_test[i][j] is 1 if jth LF is triggered on ith instance, else it is 0 s_test: numpy arrays of shape (num_instances, num_rules), s_test[i][j] is the continuous score of jth LF on ith instance qc_: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1 Return: numpy.ndarray of shape (num_instances,) which are predicted labels. Note that here the class labels appearing may not be the ones used in the Enum [Note: no aggregration/algorithm-running will be done using the current input] ''' s_temp = torch.tensor(s_test, device = self.device).double() s_temp[s_temp > 0.999] = 0.999 s_temp[s_temp < 0.001] = 0.001 assert m_test.shape == s_test.shape assert m_test.shape[1] == self.n_lfs assert np.all(np.logical_or(m_test == 1, m_test == 0)) m_temp = torch.abs(torch.tensor(m_test, device = self.device).long()) return predict_gm_labels(self.theta, self.pi, m_temp, s_temp, self.k, self.n_classes, self.n, qc_, self.device)
[docs] def predict_proba(self, path_test, qc = 0.85): ''' Used to predict labels based on a pickle file with path path_test Args: path_test: Path to the pickle file containing test data set in standard format qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 Return: numpy.ndarray of shape (num_instances, num_classes) where i,j-th element is the probability of ith instance being the jth class(the jth value when sorted in ascending order of values in Enum) [Note: no aggregration/algorithm-running will be done using the current input] ''' assert (type(qc) == np.float and (qc >= 0 and qc <= 1)) or (type(qc) == np.ndarray and (np.all(np.logical_and(qc>=0, qc<=1)) ) )\ or (type(qc) == np.int and (qc == 0 or qc == 1)) data = get_data(path_test, True, self.class_map) assert (data[2]).shape[1] == self.n_lfs and data[9] == self.n_classes temp_k = torch.tensor(data[8], device = self.device).long() assert self.k == None or torch.all(temp_k.eq(self.k)) temp_n = torch.tensor(data[7], device = self.device).double() assert self.n == None or torch.all(temp_n.eq(self.n)) s_test = torch.tensor(data[6], device = self.device).double() s_test[s_test > 0.999] = 0.999 s_test[s_test < 0.001] = 0.001 m_test = torch.abs(torch.tensor(data[2], device = self.device).long()) qc_ = torch.tensor(qc).double() if type(qc) == np.ndarray else qc if self.n == None or self.k == None: print("Warning: Predict is used before training any paramters in Cage class. Hope you have loaded parameters.") return (probability(self.theta, self.pi, m_test, s_test, temp_k, self.n_classes, temp_n, qc_, self.device)).cpu().detach().numpy()
[docs] def predict(self, path_test, qc = 0.85, need_strings = False): ''' Used to predict labels based on a pickle file with path path_test Args: path_test: Path to the pickle file containing test data set in standard format qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 need_strings: If True, the output will be in the form of strings(class names). Else it is in the form of class values(given to classes in Enum). Default is False Return: numpy.ndarray of shape (num_instances,) which are predicted labels. Elements are numbers/strings depending on need_strings attribute is false/true resp. [Note: no aggregration/algorithm-running will be done using the current input] ''' assert type(need_strings) == np.bool return get_predictions(self.predict_proba(path_test, qc), self.class_map, self.class_dict, need_strings)