Source code for languagechange.models.representation.static

import subprocess
import numpy as np
from abc import ABC, abstractmethod
from typing import List, Union
from languagechange.usages import TargetUsage
from languagechange.corpora import LinebyLineCorpus
from LSCDetection.modules.utils_ import Space
import os
from collections import defaultdict
import logging
import time
from scipy.sparse import dok_matrix
from gensim.models.word2vec import PathLineSentences
from sklearn.utils.extmath import randomized_svd
from sklearn.random_projection import SparseRandomProjection
from scipy.sparse import csr_matrix
env = os.environ.copy()
import logging

# Configure logging with a basic setup
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


[docs] class RepresentationModel(ABC): """ Abstract base class for all representation models. Provides a template for encoding methods. """
[docs] @abstractmethod def encode(self, *args, **kwargs): """ Abstract method for encoding data into a vector representation. Should be implemented by subclasses. """ pass
# todo
[docs] class StaticModel(RepresentationModel, dict): """ Base class for static word embedding models. Manages loading and accessing vector spaces. Attributes: matrix_path (str): Path to the matrix file. format (str): Format of the matrix file (e.g., 'w2v', 'npz'). """ def __init__(self, matrix_path=None, format='w2v'): logger.info("Initializing StaticModel") self.space = None self.matrix_path = matrix_path self.format = format
[docs] @abstractmethod def encode(self): """ Abstract method to perform encoding operations. Must be implemented in subclasses. """ pass
[docs] @abstractmethod def load(self): """ Load the vector space from the specified file. """ logger.info("Loading vector space from %s", self.matrix_path) self.space = Space(self.matrix_path, format=self.format)
def __getitem__(self, k): """ Get the vector representation for a given word. Args: k (str): The word to look up. Returns: np.array: Vector representation of the word. Raises: Exception: If the space is not loaded. """ if self.space == None: logger.error('Space is not loaded') raise Exception('Space is not loaded') return self.space.matrix[self.space.row2id[k]]
[docs] def matrix(self): """ Retrieve the entire matrix of word vectors. Returns: scipy.sparse.spmatrix: The matrix of word vectors. Raises: Exception: If the space is not loaded. """ if self.space == None: raise Exception('Space is not loaded') return self.space.matrix
[docs] def row2word(self): """ Retrieve the mapping of row indices to words. Returns: list: List of words corresponding to matrix rows. Raises: Exception: If the space is not loaded. """ if self.space == None: raise Exception('Space is not loaded') return self.space.id2row
[docs] class CountModel(StaticModel): """ Count-based word embedding model that builds a co-occurrence matrix from a corpus. Attributes: corpus (LinebyLineCorpus): The corpus to process. window_size (int): The size of the context window. savepath (str): Path to save the generated matrix. """ def __init__(self, corpus:LinebyLineCorpus, window_size:int, savepath:str): super(CountModel,self).__init__() self.corpus = corpus self.window_size = window_size # make sure the path is ending with npz if not savepath.endswith('.npz'): savepath += '.npz' self.savepath = savepath self.format = 'npz' self.matrix_path = os.path.join(self.savepath)
[docs] def encode(self, is_len = False): """ Build a co-occurrence matrix from the corpus and save it to the specified path. """ # Previously #subprocess.run(["python3", "-m", "LSCDetection.representations.count", self.corpus.path, self.savepath, str(self.window_size)]) # Code below from LSCDetection: """ Make count-based vector space from corpus. """ # check the cache in the save path if os.path.exists(self.savepath): try: logger.info(f"Loading cached count matrix from {self.savepath}") self.load() return except Exception as e: logger.error(f"Cache loading failed: {str(e)}, regenerating matrix...") os.remove(self.savepath) start_time = time.time() # Build vocabulary logging.info("Building vocabulary") sentences = PathLineSentences(self.corpus.path) vocabulary = sorted(list(set([word for sentence in sentences for word in sentence if len(sentence)>1]))) # Skip one-word sentences to avoid zero-vectors w2i = {w: i for i, w in enumerate(vocabulary)} # Initialize co-occurrence matrix as dictionary cooc_mat = defaultdict(lambda: 0) # Get counts from corpus sentences = PathLineSentences(self.corpus.path) logging.info("Counting context words") for sentence in sentences: for i, word in enumerate(sentence): lowerWindowSize = max(i-self.window_size, 0) upperWindowSize = min(i+self.window_size, len(sentence)) window = sentence[lowerWindowSize:i] + sentence[i+1:upperWindowSize+1] if len(window)==0: # Skip one-word sentences continue windex = w2i[word] for contextWord in window: cooc_mat[(windex,w2i[contextWord])] += 1 # Convert dictionary to sparse matrix logging.info("Converting dictionary to matrix") cooc_mat_sparse = dok_matrix((len(vocabulary),len(vocabulary)), dtype=float) try: cooc_mat_sparse.update(cooc_mat) except NotImplementedError: cooc_mat_sparse._update(cooc_mat) outSpace = Space(matrix=cooc_mat_sparse, rows=vocabulary, columns=vocabulary) if is_len: # L2-normalize vectors outSpace.l2_normalize() # Save the matrix outSpace.save(self.savepath) logging.info("--- %s seconds ---" % (time.time() - start_time))
[docs] class PPMI(CountModel): """ Positive Pointwise Mutual Information (PPMI) model that transforms a co-occurrence matrix. Attributes: count_model (CountModel): The count-based model to transform. shifting_parameter (int): Parameter to shift values after applying log weighting. smoothing_parameter (int): Parameter to smooth the matrix values. savepath (str): Path to save the PPMI matrix. """ def __init__(self, count_model:CountModel, shifting_parameter:int, smoothing_parameter:int, savepath:str): logger.info("Initializing PPMI model") super(PPMI,self).__init__(self,count_model.window_size, count_model.savepath) self.count_model = count_model self.shifting_parameter = shifting_parameter self.smoothing_parameter = smoothing_parameter self.savepath = savepath # self.format ??? self.matrix_path = os.path.join(self.savepath) self.align_strategies = {'OP', 'SRV', 'WI'}
[docs] def encode(self, is_len = False): # Previously #subprocess.run(["python3", "-m", "LSCDetection.representations.ppmi", self.count_model.matrix_path, self.savepath, str(self.shifting_parameter), str(self.smoothing_parameter)]) # Code below from LSCDetection """ Compute the smoothed and shifted PPMI matrix from a co-occurrence matrix. Smoothing is performed as described in Omer Levy, Yoav Goldberg, and Ido Dagan. 2015. Improving distributional similarity with lessons learned from word embeddings. Trans. ACL, 3. """ # check the cache in savepath if os.path.exists(self.savepath): try: logger.info(f"Loading cached PPMI matrix from {self.savepath}") self.load() return except Exception as e: logger.error(f"Cache loading failed: {str(e)}, recomputing PPMI...") os.remove(self.savepath) logger.info("Starting PPMI encoding process") start_time = time.time() # Load input matrix space = Space(self.count_model.matrix_path) # Apply transformations logger.info("Applying transformations: EPMI weighting, log weighting, shifting") # Apply EPMI weighting space.epmi_weighting(self.smoothing_parameter) # Apply log weighting space.log_weighting() # Shift values space.shifting(self.shifting_parameter) # Eliminate negative counts space.eliminate_negative() # Eliminate zero counts space.eliminate_zeros() outSpace = Space(matrix=space.matrix, rows=space.rows, columns=space.columns) if is_len: # L2-normalize vectors outSpace.l2_normalize() # Save the matrix outSpace.save(self.savepath) logger.info("PPMI encoding completed in %s seconds", time.time() - start_time)
[docs] class SVD(StaticModel): """ Singular Value Decomposition (SVD) model that reduces the dimensionality of a matrix. Attributes: count_model (CountModel): The input count-based model. dimensionality (int): Target dimensionality for the reduced matrix. gamma (float): Weighting parameter for singular values. savepath (str): Path to save the reduced matrix. """ def __init__(self, count_model:CountModel, dimensionality:int, gamma:float, savepath:str): logger.info("Initializing SVD model") super(SVD,self).__init__() self.count_model = count_model self.dimensionality = dimensionality self.gamma = gamma # make sure the save path is end with .w2v if not savepath.endswith('.w2v'): savepath += '.w2v' self.savepath = savepath self.matrix_path = os.path.join(self.savepath) self.format = 'w2v' self.align_strategies = {'OP', 'SRV', 'WI'}
[docs] def encode(self, is_len = False): # Previously #subprocess.run(["python3", "-m", "LSCDetection.representations.svd", self.count_model.matrix_path, self.savepath, str(self.dimensionality), str(self.gamma)]) # Code below from LSCDetection """ Perform dimensionality reduction on a (normally PPMI) matrix by applying truncated SVD as described in Omer Levy, Yoav Goldberg, and Ido Dagan. 2015. Improving distributional similarity with lessons learned from word embeddings. Trans. ACL, 3. """ # check the cache file from savepath if os.path.exists(self.savepath): try: logger.info(f"Loading cached SVD matrix from {self.savepath}") self.load() return except Exception as e: logger.error(f"Cache loading failed: {str(e)}, recomputing SVD decomposition...") os.remove(self.savepath) logger.info("Starting SVD encoding process") start_time = time.time() # Load input matrix space = Space(self.count_model.matrix_path) matrix = space.matrix # Get mappings between rows/columns and words rows = space.rows id2row = space.id2row id2column = space.id2column # Apply SVD logger.info("Applying truncated SVD") u, s, v = randomized_svd(matrix, n_components=self.dimensionality, n_iter=5, transpose=False) # Weight matrix if self.gamma == 0.0: matrix_reduced = u elif self.gamma == 1.0: #matrix_reduced = np.dot(u, np.diag(s)) # This is equivalent to the below formula (because s is a flattened diagonal matrix) matrix_reduced = s * u else: #matrix_ = np.dot(u, np.power(np.diag(s), gamma)) # This is equivalent to the below formula matrix_reduced = np.power(s, self.gamma) * u outSpace = Space(matrix=matrix_reduced, rows=rows, columns=[]) if is_len: # L2-normalize vectors outSpace.l2_normalize() # Save the matrix outSpace.save(self.savepath, format='w2v') logger.info("SVD encoding completed in %s seconds", time.time() - start_time)
# todo: add corpus
[docs] class RandomIndexing(StaticModel): """ Random Indexing model that creates low-dimensional vector spaces from a co-occurrence matrix. Attributes: window_size (int): Size of the context window for random indexing. """ def __init__(self): logger.info("Initializing RandomIndexing model") super(RandomIndexing,self).__init__() self.align_strategies = {'OP', 'SRV', 'WI'} pass
[docs] def encode(self, is_len = False): # Previously #subprocess.run(["python3", "-m", "LSCDetection.representations.ri", corpus.path, self.savepath, self.window_size]) # Code below from LSCDetection """ Create low-dimensional vector space by sparse random indexing from co-occurrence matrix. """ logger.info("Starting RandomIndexing encoding process") start_time = time.time() # Load input matrix countSpace = Space(corpus.path) # todo: corpus needs to reference something here countMatrix = countSpace.matrix rows = countSpace.rows columns = countSpace.columns # Generate random vectors #randomMatrix = csr_matrix(sparse_random_matrix(self.window_size,len(columns)).toarray().T) #logging.info("Multiplying matrices") #reducedMatrix = np.dot(countMatrix,randomMatrix) randomMatrix = SparseRandomProjection(self.window_size).fit_transform(countMatrix) outSpace = Space(matrix=randomMatrix, rows=rows, columns=[]) if is_len: # L2-normalize vectors outSpace.l2_normalize() # Save the matrix outSpace.save(self.savepath, format='w2v') logger.info("RandomIndexing encoding completed in %s seconds", time.time() - start_time)