Source code for spear.jl.subset_selection

from submodlib.functions.facilityLocation import FacilityLocationFunction

import numpy as np
import torch
from sklearn.metrics.pairwise import euclidean_distances
import pickle
from os import path as check_path

from ..utils.data_editor import get_data, get_classes
from ..utils.utils_cage import  predict_gm_labels
from ..utils.utils_jl import find_indices, get_similarity_kernel

[docs]def rand_subset(n_all, n_instances): ''' A function to choose random indices of the input instances to be labeled Args: n_all: number of available instances, type in integer n_intances: number of instances to be labelled, type is integer Return: A numpy.ndarray of the indices(of shape (n_sup,) and each element in the range [0,n_all-1)) to be labeled ''' assert type(n_all) == np.int or type(n_all) == np.float assert type(n_instances) == np.int or type(n_instances) == np.float assert np.int(n_all) > np.int(n_instances) return np.sort(np.random.choice(int(n_all), int(n_instances), replace = False))
[docs]def unsup_subset(x_train, n_unsup): ''' A function for unsupervised subset selection(the subset to be labeled) Args: x_train: A numpy.ndarray of shape (n_instances, n_features). All the data, intended to be used for training n_unsup: number of instances to be found during unsupervised subset selection, type is integer Return: numpy.ndarray of indices(shape is (n_sup,), each element lies in [0,x_train.shape[0])), the result of subset selection ''' assert x_train.shape[0] > int(n_unsup) assert type(x_train) == np.ndarray use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") torch.backends.cudnn.benchmark = True #fl = apricot.functions.facilityLocation.FacilityLocationSelection(random_state = 0, n_samples = int(n_unsup)) #x_sub = fl.fit_transform(x_train) #indices = find_indices(torch.from_numpy(x_train).to(device=device), torch.from_numpy(np.array(x_sub)).to(device=device)) fl = FacilityLocationFunction(n = x_train.shape[0], mode = "dense", data = x_train, metric = "euclidean") x_sub = fl.maximize(budget = int(n_unsup), optimizer = 'LazyGreedy', stopIfZeroGain = False, stopIfNegativeGain = False, verbose = False) indices = np.array([i[0] for i in x_sub]) return np.sort(indices)
[docs]def sup_subset(path_json, path_pkl, n_sup, qc = 0.85): ''' A helper function for supervised subset selection(the subset to be labeled) which just returns indices Args: path_json: Path to json file of number to string(class name) map path_pkl: Path to the pickle file containing all the training data in standard format n_sup: Number of instances to be found during supervised subset selection qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 Return: numpy.ndarray of indices(shape is (n_sup,), each element lies in [0,num_instances)), the result of subset selection AND the data which is list of contents of path_pkl ''' assert (type(qc) == np.float and (qc >= 0 and qc <= 1)) or (type(qc) == np.ndarray and (np.all(np.logical_and(qc>=0, qc<=1)) ) )\ or (type(qc) == np.int and (qc == 0 or qc == 1)) class_dict = get_classes(path_json) class_list = list((class_dict).keys()) class_list.sort() n_classes = len(class_dict) class_map = {value : index for index, value in enumerate(class_list)} class_map[None] = n_classes use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") torch.backends.cudnn.benchmark = True data = get_data(path_pkl, True, class_map) m = torch.abs(torch.tensor(data[2], device = device).long()) s = torch.tensor(data[6], device = device).double() # continuous score assert m.shape[0] > int(n_sup) k = torch.tensor(data[8], device = device).long() # LF's classes n_lfs = m.shape[1] continuous_mask = torch.tensor(data[7], device = device).double() # Mask for s/continuous_mask qc_temp = torch.tensor(qc, device = device).double() if type(qc) == np.ndarray else qc params_1 = torch.ones((n_classes, n_lfs), device = device).double() # initialisation of gm parameters, refer section 3.4 in the JL paper params_2 = torch.ones((n_classes, n_lfs), device = device).double() y_train_pred = predict_gm_labels(params_1, params_2, m, s, k, n_classes, continuous_mask, qc_temp, device) kernel = get_similarity_kernel(y_train_pred) similarity = euclidean_distances(data[0]) sim_mat = kernel * similarity #fl = apricot.functions.facilityLocation.FacilityLocationSelection(random_state = 0, metric = 'precomputed', n_samples = int(n_sup)) #sim_sub = fl.fit_transform(sim_mat) #indices = find_indices(torch.from_numpy(sim_mat).to(device=device), torch.from_numpy(np.array(sim_sub)).to(device=device)) fl = FacilityLocationFunction(n = sim_mat.shape[0], mode = "dense", sijs = sim_mat, separate_rep = False) sim_sub = fl.maximize(budget = int(n_sup), optimizer = 'LazyGreedy', stopIfZeroGain = False, stopIfNegativeGain = False, verbose = False) indices = np.array([i[0] for i in sim_sub]) return np.sort(indices), data
[docs]def sup_subset_indices(path_json, path_pkl, n_sup, qc = 0.85): ''' A function for supervised subset selection(the subset to be labeled) whcih just returns indices Args: path_json: Path to json file of number to string(class name) map path_pkl: Path to the pickle file containing all the training data in standard format n_sup: Number of instances to be found during supervised subset selection qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 Return: numpy.ndarray of indices(shape is (n_sup,), each element lies in [0,num_instances)), the result of subset selection ''' indices, _ = sup_subset(path_json, path_pkl, n_sup, qc) return indices
[docs]def sup_subset_save_files(path_json, path_pkl, path_save_L, path_save_U, n_sup, qc = 0.85): ''' A function for supervised subset selection(the subset to be labeled) which makes separate pickle files of data, one for those to be labelled, other that can be left unlabelled Args: path_json: Path to json file of number to string(class name) map path_pkl: Path to the pickle file containing all the training data in standard format path_save_L: Path to save the pickle file of set of instances to be labelled. Note that instances are not labelled yet. Extension should be .pkl path_save_U: Path to save the pickle file of set of instances that can be left unlabelled. Extension should be .pkl n_sup: number of instances to be found during supervised subset selection qc: Quality index of shape (n_lfs,) of type numpy.ndarray OR a float. Values must be between 0 and 1. Default is 0.85 Return: numpy.ndarray of indices(shape is (n_sup,), each element lies in [0,num_instances)), the result of subset selection. Also two pickle files are saved at path_save_L and path_save_U ''' indices, data = sup_subset(path_json, path_pkl, n_sup, qc) false_mask = np.ones(data[0].shape[0], dtype = bool) false_mask[indices] = False save_file_L = open(path_save_L, 'wb') save_file_U = open(path_save_U, 'wb') for i in range(10): if i < 7: if data[i].shape[0] == 0: pickle.dump(data[i], save_file_L) pickle.dump(data[i], save_file_U) else: pickle.dump(data[i][indices], save_file_L) pickle.dump(data[i][false_mask], save_file_U) elif i >= 7: pickle.dump(data[i], save_file_L) pickle.dump(data[i], save_file_U) save_file_L.close() save_file_U.close() return indices
[docs]def replace_in_pkl(path, path_save, np_array, index): ''' A function to insert the true labels, after labeling the instances, to the pickle file Args: path: Path to the pickle file containing all the data in standard format path_save: Path to save the pickle file after replacing the 'L'(true labels numpy array) of data in path pickle file np_array: The data which is to be used to replace the data in path pickle file with index: Index of the numpy array, in data of path pickle file, to be replaced with np_array. Value should be in [0,8] Return: No return value. A pickle file is generated at path_save ''' assert type(index) == np.int and index >=0 and index < 9 assert check_path.exists(path) #path is imported from os above data = [] with open(path, 'rb') as file: for i in range(9): data.append(pickle.load(file)) assert type(data[i]) == np.ndarray data.append(pickle.load(file)) assert data[index].shape[0] == 0 or np_array.shape == data[index].shape save_file = open(path_save, 'wb') for i in range(10): if i == index: pickle.dump(np_array, save_file) else: pickle.dump(data[i], save_file) save_file.close() return
[docs]def insert_true_labels(path, path_save, labels): ''' A function to insert the true labels, after labeling the instances, to the pickle file Args: path: Path to the pickle file containing all the data in standard format path_save: Path to save the pickle file after replacing the 'L'(true labels numpy array) of data in path pickle file labels: The true labels of the data in pickle file. numpy.ndarray of shape (num_instances, 1) Return: No return value. A pickle file is generated at path_save ''' assert check_path.exists(path) #path is imported from os above data = [] with open(path, 'rb') as file: for i in range(9): data.append(pickle.load(file)) assert type(data[i]) == np.ndarray data.append(pickle.load(file)) assert labels.shape[0] == data[0].shape[0] and labels.shape[1] == 1 save_file = open(path_save, 'wb') for i in range(10): if i == 3: pickle.dump(labels, save_file) elif i == 4: pickle.dump(np.ones([labels.size ,1]), save_file) else: pickle.dump(data[i], save_file) save_file.close() return