import subprocess
import numpy as np
from abc import ABC, abstractmethod
from typing import List, Union
from languagechange.usages import TargetUsage
from languagechange.corpora import LinebyLineCorpus
from LSCDetection.modules.utils_ import Space
from languagechange.models.representation.static import StaticModel
import os
from LSCDetection.modules import embeddings
from LSCDetection.modules.cupy_utils import *
from LSCDetection.alignment.map_embeddings import dropout, topk_mean
import re
import sys
import collections
import time
import logging
[docs]
class OrthogonalProcrustes():
"""
A class to align word embeddings using the Orthogonal Procrustes method.
This method aligns two embedding spaces by finding an optimal orthogonal transformation.
"""
def __init__(self, savepath1:str, savepath2:str):
"""
Initialize the class with paths to save the aligned embeddings.
Args:
savepath1 (str): Path to save the aligned version of the first model.
savepath2 (str): Path to save the aligned version of the second model.
"""
self.savepath1 = savepath1
self.savepath2 = savepath2
# This function is adapted from https://github.com/Garrafao/LSCDetection/blob/master/alignment/map_embeddings.py
[docs]
def align(self, model1:StaticModel, model2:StaticModel,
encoding = 'utf-8', # the character encoding for input/output (defaults to utf-8)
precision = 'fp32', # should be in {'fp16','fp32','fp64'}
cuda = False, # use cuda (requires cupy)
batch_size : int = 10000, # batch size (defaults to 10000); does not affect results, larger is usually faster but uses more memory
seed = 0, # the random seed (defaults to 0)
# recommended args
supervised = None, # recommended if you have a large training dictionary
semi_supervised = None, # recommended if you have a small seed dictionary
identical = False, # recommended if you have no seed dictionary but can rely on identical words
unsupervised = False, # recommended if you have no seed dictionary and do not want to rely on identical words
acl2018 = False, # reproduce our ACL 2018 system
aaai2018 = None, # reproduce our AAAI 2018 system
acl2017 = False, # reproduce our ACL 2017 system with numeral initialization
acl2017_seed = None, # reproduce our ACL 2017 system with a seed dictionary
emnlp2016 = None, # reproduce our EMNLP 2016 system
# init args. Below four are mutually exclusive
init_dictionary = sys.stdin.fileno(), # the training dictionary file (defaults to stdin)
init_identical = True, # use identical words as the seed dictionary
init_numerals = False, # use latin numerals (i.e. words matching [0-9]+) as the seed dictionary
init_unsupervised = False, # recommended if you have no seed dictionary and do not want to rely on identical words
unsupervised_vocab : int = 0, # restrict the vocabulary to the top k entries for unsupervised initialization
# mapping args
normalize = ['unit'], # the normalization actions to perform in order. Should be a list of {'unit', 'center', 'unitdim', 'centeremb', 'none'}
whiten = False, # whiten the embeddings
src_reweight : float = 0, # re-weight the source language embeddings
trg_reweight : float = 0, # re-weight the target language embeddings
src_dewhiten = None, # de-whiten the source language embeddings
trg_dewhiten = None, # de-whiten the target language embeddings
dim_reduction : int = 0, # apply dimensionality reduction
# The two arguments below are mutually exclusive
orthogonal = True, # use orthogonal constrained mapping
unconstrained = False, # use unconstrained mapping
# self-learning args
self_learning = False, # enable self-learning
vocabulary_cutoff : int = 0, # restrict the vocabulary to the top k entries
direction = 'union', # the direction for dictionary induction (defaults to union). Choices=['forward', 'backward', 'union']
csls_neighborhood : int = 0, # use CSLS for dictionary induction
threshold : float = 0.000001, # the convergence threshold (defaults to 0.000001)
validation = None, # a dictionary file for validation at each iteration
stochastic_initial : float = 0.1, # initial keep probability stochastic dictionary induction (defaults to 0.1)
stochastic_multiplier : float = 2.0, # stochastic dictionary induction multiplier (defaults to 2.0)
stochastic_interval : int = 50, # stochastic dictionary induction interval (defaults to 50)
log = None, # write to a log file in tsv format at each iteration
verbose = False # write log information to stderr at each iteration
):
"""
Perform orthogonal alignment between two embedding models using a subprocess.
Args:
model1 (StaticModel): The first static word embedding model to align.
model2 (StaticModel): The second static word embedding model to align.
"""
# Previously
#subprocess.run(["python3", "-m", "LSCDetection.alignment.map_embeddings",
# "--normalize", "unit",
# "--init_identical",
# "--orthogonal",
# model1.matrix_path,
# model2.matrix_path,
# self.savepath1,
# self.savepath2])
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logging.info(__file__.upper())
start_time = time.time()
# Asserting correct arguments
assert src_dewhiten in {'src', 'trg', None}
assert trg_dewhiten in {'src', 'trg', None}
assert not (orthogonal and unconstrained)
assert direction in {'forward', 'backward', 'union'}
assert (sum(bool(arg) for arg in {supervised, semi_supervised, identical, unsupervised, acl2018, aaai2018, acl2017, acl2017_seed, emnlp2016}) <= 1)
if init_identical or init_numerals or init_unsupervised:
init_dictionary = None
assert (sum(bool(arg) for arg in {init_dictionary, init_identical, init_numerals, init_unsupervised}) <= 1)
for e in normalize:
assert e in {'unit', 'center', 'unitdim', 'centeremb', 'none'}
if supervised is not None:
init_dictionary = supervised
normalize=['unit', 'center', 'unit']
whiten=True
src_reweight=0.5
trg_reweight=0.5
src_dewhiten='src'
trg_dewhiten='trg'
batch_size=1000
if semi_supervised is not None:
init_dictionary=semi_supervised
normalize=['unit', 'center', 'unit']
whiten=True
src_reweight=0.5
trg_reweight=0.5
src_dewhiten='src'
trg_dewhiten='trg'
self_learning=True
vocabulary_cutoff=20000
csls_neighborhood=10
if identical:
init_identical=True
normalize=['unit', 'center', 'unit']
whiten=True
src_reweight=0.5
trg_reweight=0.5
src_dewhiten='src'
trg_dewhiten='trg'
self_learning=True
vocabulary_cutoff=20000
csls_neighborhood=10
if unsupervised or acl2018:
init_unsupervised=True
unsupervised_vocab=4000
normalize=['unit', 'center', 'unit']
whiten=True
src_reweight=0.5
trg_reweight=0.5
src_dewhiten='src'
trg_dewhiten='trg'
self_learning=True
vocabulary_cutoff=20000
csls_neighborhood=10
if aaai2018:
init_dictionary=aaai2018
normalize=['unit', 'center']
whiten=True
trg_reweight=1
src_dewhiten='src'
trg_dewhiten='trg'
batch_size=1000
if acl2017:
init_numerals=True
orthogonal=True
normalize=['unit', 'center']
self_learning=True
direction='forward'
stochastic_initial=1.0
stochastic_interval=1
batch_size=1000
if acl2017_seed:
init_dictionary=acl2017_seed
orthogonal=True
normalize=['unit', 'center']
self_learning=True
direction='forward'
stochastic_initial=1.0
stochastic_interval=1
batch_size=1000
if emnlp2016:
init_dictionary=emnlp2016
orthogonal=True
normalize=['unit', 'center']
batch_size=1000
# Check arguments
if (src_dewhiten is not None or trg_dewhiten is not None) and not whiten:
logging.info('ERROR: De-whitening requires whitening first')
sys.exit(-1)
# Choose the right dtype for the desired precision
if precision == 'fp16':
dtype = 'float16'
elif precision == 'fp32':
dtype = 'float32'
elif precision == 'fp64':
dtype = 'float64'
else:
logging.info("ERROR: Precision needs to be one of ('fp16','fp32','fp64')")
sys.exit(-1)
# Read input embeddings
src_input = model1.matrix_path #the input source embeddings
trg_input = model2.matrix_path #the input target embeddings
srcfile = open(src_input, encoding=encoding, errors='surrogateescape')
trgfile = open(trg_input, encoding=encoding, errors='surrogateescape')
src_words, x = embeddings.read(srcfile, dtype=dtype)
trg_words, z = embeddings.read(trgfile, dtype=dtype)
# NumPy/CuPy management
if cuda:
if not supports_cupy():
print('ERROR: Install CuPy for CUDA support', file=sys.stderr) # Change to logging
sys.exit(-1)
xp = get_cupy()
x = xp.asarray(x)
z = xp.asarray(z)
else:
xp = np
xp.random.seed(seed)
# Build word to index map
src_word2ind = {word: i for i, word in enumerate(src_words)}
trg_word2ind = {word: i for i, word in enumerate(trg_words)}
# STEP 0: Normalization
embeddings.normalize(x, normalize)
embeddings.normalize(z, normalize)
# Build the seed dictionary
src_indices = []
trg_indices = []
if init_unsupervised:
sim_size = min(x.shape[0], z.shape[0]) if unsupervised_vocab <= 0 else min(x.shape[0], z.shape[0], unsupervised_vocab)
u, s, vt = xp.linalg.svd(x[:sim_size], full_matrices=False)
xsim = (u*s).dot(u.T)
u, s, vt = xp.linalg.svd(z[:sim_size], full_matrices=False)
zsim = (u*s).dot(u.T)
del u, s, vt
xsim.sort(axis=1)
zsim.sort(axis=1)
embeddings.normalize(xsim, normalize)
embeddings.normalize(zsim, normalize)
sim = xsim.dot(zsim.T)
if csls_neighborhood > 0:
knn_sim_fwd = topk_mean(sim, k=csls_neighborhood)
knn_sim_bwd = topk_mean(sim.T, k=csls_neighborhood)
sim -= knn_sim_fwd[:, xp.newaxis]/2 + knn_sim_bwd/2
if direction == 'forward':
src_indices = xp.arange(sim_size)
trg_indices = sim.argmax(axis=1)
elif direction == 'backward':
src_indices = sim.argmax(axis=0)
trg_indices = xp.arange(sim_size)
elif direction == 'union':
src_indices = xp.concatenate((xp.arange(sim_size), sim.argmax(axis=0)))
trg_indices = xp.concatenate((sim.argmax(axis=1), xp.arange(sim_size)))
del xsim, zsim, sim
elif init_numerals:
numeral_regex = re.compile('^[0-9]+$')
src_numerals = {word for word in src_words if numeral_regex.match(word) is not None}
trg_numerals = {word for word in trg_words if numeral_regex.match(word) is not None}
numerals = src_numerals.intersection(trg_numerals)
for word in numerals:
src_indices.append(src_word2ind[word])
trg_indices.append(trg_word2ind[word])
elif init_identical:
identical = set(src_words).intersection(set(trg_words))
for word in identical:
src_indices.append(src_word2ind[word])
trg_indices.append(trg_word2ind[word])
else:
f = open(init_dictionary, encoding=encoding, errors='surrogateescape')
for line in f:
src, trg = line.split()
try:
src_ind = src_word2ind[src]
trg_ind = trg_word2ind[trg]
src_indices.append(src_ind)
trg_indices.append(trg_ind)
except KeyError:
print('WARNING: OOV dictionary entry ({0} - {1})'.format(src, trg), file=sys.stderr)
# Read validation dictionary
if validation is not None:
f = open(validation, encoding=encoding, errors='surrogateescape')
validation_dict = collections.defaultdict(set)
oov = set()
vocab = set()
for line in f:
src, trg = line.split()
try:
src_ind = src_word2ind[src]
trg_ind = trg_word2ind[trg]
validation_dict[src_ind].add(trg_ind)
vocab.add(src)
except KeyError:
oov.add(src)
oov -= vocab # If one of the translation options is in the vocabulary, then the entry is not an oov
validation_coverage = len(validation_dict) / (len(validation_dict) + len(oov))
# Create log file
if log:
log_file = open(log, mode='w', encoding=encoding, errors='surrogateescape')
# Allocate memory
xw = xp.empty_like(x)
zw = xp.empty_like(z)
src_size = x.shape[0] if vocabulary_cutoff <= 0 else min(x.shape[0], vocabulary_cutoff)
trg_size = z.shape[0] if vocabulary_cutoff <= 0 else min(z.shape[0], vocabulary_cutoff)
simfwd = xp.empty((batch_size, trg_size), dtype=dtype)
simbwd = xp.empty((batch_size, src_size), dtype=dtype)
if validation is not None:
simval = xp.empty((len(validation_dict.keys()), z.shape[0]), dtype=dtype)
best_sim_forward = xp.full(src_size, -100, dtype=dtype)
src_indices_forward = xp.arange(src_size)
trg_indices_forward = xp.zeros(src_size, dtype=int)
best_sim_backward = xp.full(trg_size, -100, dtype=dtype)
src_indices_backward = xp.zeros(trg_size, dtype=int)
trg_indices_backward = xp.arange(trg_size)
knn_sim_fwd = xp.zeros(src_size, dtype=dtype)
knn_sim_bwd = xp.zeros(trg_size, dtype=dtype)
# Training loop
best_objective = objective = -100.
it = 1
last_improvement = 0
keep_prob = stochastic_initial
t = time.time()
end = not self_learning
while True:
# Increase the keep probability if we have not improve in stochastic_interval iterations
if it - last_improvement > stochastic_interval:
if keep_prob >= 1.0:
end = True
keep_prob = min(1.0, stochastic_multiplier*keep_prob)
last_improvement = it
# Update the embedding mapping
if orthogonal or not end: # orthogonal mapping
u, s, vt = xp.linalg.svd(z[trg_indices].T.dot(x[src_indices]))
w = vt.T.dot(u.T)
x.dot(w, out=xw)
zw[:] = z
elif unconstrained: # unconstrained mapping
x_pseudoinv = xp.linalg.inv(x[src_indices].T.dot(x[src_indices])).dot(x[src_indices].T)
w = x_pseudoinv.dot(z[trg_indices])
x.dot(w, out=xw)
zw[:] = z
else: # advanced mapping
# TODO xw.dot(wx2, out=xw) and alike not working
xw[:] = x
zw[:] = z
# STEP 1: Whitening
def whitening_transformation(m):
u, s, vt = xp.linalg.svd(m, full_matrices=False)
return vt.T.dot(xp.diag(1/s)).dot(vt)
if whiten:
wx1 = whitening_transformation(xw[src_indices])
wz1 = whitening_transformation(zw[trg_indices])
xw = xw.dot(wx1)
zw = zw.dot(wz1)
# STEP 2: Orthogonal mapping
wx2, s, wz2_t = xp.linalg.svd(xw[src_indices].T.dot(zw[trg_indices]))
wz2 = wz2_t.T
xw = xw.dot(wx2)
zw = zw.dot(wz2)
# STEP 3: Re-weighting
xw *= s**src_reweight
zw *= s**trg_reweight
# STEP 4: De-whitening
if src_dewhiten == 'src':
xw = xw.dot(wx2.T.dot(xp.linalg.inv(wx1)).dot(wx2))
elif src_dewhiten == 'trg':
xw = xw.dot(wz2.T.dot(xp.linalg.inv(wz1)).dot(wz2))
if trg_dewhiten == 'src':
zw = zw.dot(wx2.T.dot(xp.linalg.inv(wx1)).dot(wx2))
elif trg_dewhiten == 'trg':
zw = zw.dot(wz2.T.dot(xp.linalg.inv(wz1)).dot(wz2))
# STEP 5: Dimensionality reduction
if dim_reduction > 0:
xw = xw[:, :dim_reduction]
zw = zw[:, :dim_reduction]
# Self-learning
if end:
break
else:
# Update the training dictionary
if direction in ('forward', 'union'):
if csls_neighborhood > 0:
for i in range(0, trg_size, simbwd.shape[0]):
j = min(i + simbwd.shape[0], trg_size)
zw[i:j].dot(xw[:src_size].T, out=simbwd[:j-i])
knn_sim_bwd[i:j] = topk_mean(simbwd[:j-i], k=csls_neighborhood, inplace=True)
for i in range(0, src_size, simfwd.shape[0]):
j = min(i + simfwd.shape[0], src_size)
xw[i:j].dot(zw[:trg_size].T, out=simfwd[:j-i])
simfwd[:j-i].max(axis=1, out=best_sim_forward[i:j])
simfwd[:j-i] -= knn_sim_bwd/2 # Equivalent to the real CSLS scores for NN
dropout(simfwd[:j-i], 1 - keep_prob).argmax(axis=1, out=trg_indices_forward[i:j])
if direction in ('backward', 'union'):
if csls_neighborhood > 0:
for i in range(0, src_size, simfwd.shape[0]):
j = min(i + simfwd.shape[0], src_size)
xw[i:j].dot(zw[:trg_size].T, out=simfwd[:j-i])
knn_sim_fwd[i:j] = topk_mean(simfwd[:j-i], k=csls_neighborhood, inplace=True)
for i in range(0, trg_size, simbwd.shape[0]):
j = min(i + simbwd.shape[0], trg_size)
zw[i:j].dot(xw[:src_size].T, out=simbwd[:j-i])
simbwd[:j-i].max(axis=1, out=best_sim_backward[i:j])
simbwd[:j-i] -= knn_sim_fwd/2 # Equivalent to the real CSLS scores for NN
dropout(simbwd[:j-i], 1 - keep_prob).argmax(axis=1, out=src_indices_backward[i:j])
if direction == 'forward':
src_indices = src_indices_forward
trg_indices = trg_indices_forward
elif direction == 'backward':
src_indices = src_indices_backward
trg_indices = trg_indices_backward
elif direction == 'union':
src_indices = xp.concatenate((src_indices_forward, src_indices_backward))
trg_indices = xp.concatenate((trg_indices_forward, trg_indices_backward))
# Objective function evaluation
if direction == 'forward':
objective = xp.mean(best_sim_forward).tolist()
elif direction == 'backward':
objective = xp.mean(best_sim_backward).tolist()
elif direction == 'union':
objective = (xp.mean(best_sim_forward) + xp.mean(best_sim_backward)).tolist() / 2
if objective - best_objective >= threshold:
last_improvement = it
best_objective = objective
# Accuracy and similarity evaluation in validation
if validation is not None:
src = list(validation_dict.keys())
xw[src].dot(zw.T, out=simval)
nn = asnumpy(simval.argmax(axis=1))
accuracy = np.mean([1 if nn[i] in validation_dict[src[i]] else 0 for i in range(len(src))])
similarity = np.mean([max([simval[i, j].tolist() for j in validation_dict[src[i]]]) for i in range(len(src))])
# Logging
duration = time.time() - t
if verbose:
print(file=sys.stderr)
print('ITERATION {0} ({1:.2f}s)'.format(it, duration), file=sys.stderr)
print('\t- Objective: {0:9.4f}%'.format(100 * objective), file=sys.stderr)
print('\t- Drop probability: {0:9.4f}%'.format(100 - 100*keep_prob), file=sys.stderr)
if validation is not None:
print('\t- Val. similarity: {0:9.4f}%'.format(100 * similarity), file=sys.stderr)
print('\t- Val. accuracy: {0:9.4f}%'.format(100 * accuracy), file=sys.stderr)
print('\t- Val. coverage: {0:9.4f}%'.format(100 * validation_coverage), file=sys.stderr)
sys.stderr.flush()
if log is not None:
val = '{0:.6f}\t{1:.6f}\t{2:.6f}'.format(
100 * similarity, 100 * accuracy, 100 * validation_coverage) if validation is not None else ''
print('{0}\t{1:.6f}\t{2}\t{3:.6f}'.format(it, 100 * objective, val, duration), file=log_file)
log_file.flush()
t = time.time()
it += 1
# Write mapped embeddings
srcfile = open(self.savepath1, mode='w', encoding=encoding, errors='surrogateescape')
trgfile = open(self.savepath2, mode='w', encoding=encoding, errors='surrogateescape')
embeddings.write(src_words, xw, srcfile) #the output source embeddings
embeddings.write(trg_words, zw, trgfile) #the output target embeddings
srcfile.close()
trgfile.close()
logging.info("--- %s seconds ---" % (time.time() - start_time))