import bz2
import gzip
import logging
import os
import re
from typing import List, Pattern, Self, Union
import lxml.etree as ET
import trankit
from languagechange.resource_manager import LanguageChange
from languagechange.search import SearchTerm
from languagechange.usages import TargetUsage, TargetUsageList, UsageDictionary
from languagechange.utils import LiteralTime
from sortedcontainers import SortedKeyList
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
[docs]
class Line:
def __init__(self,
raw_text=None,
tokens=None,
lemmas=None,
pos_tags=None,
fname=None,
raw_lemma_text=None,
raw_pos_text = None,
**kwargs,
):
self._raw_text = raw_text
self._raw_lemma_text = raw_lemma_text
self._raw_pos_text = raw_pos_text
self._tokens = tokens
self._lemmas = lemmas
self._pos_tags = pos_tags
self._fname = fname
self.__dict__.update(kwargs)
[docs]
def tokens(self):
if not self._tokens == None:
return self._tokens
else:
return self._lemmas
[docs]
def lemmas(self):
return self._lemmas
[docs]
def tokens_by_feature(self, feat = str):
if feat == 'token':
return self.tokens()
elif feat == 'lemma':
return self.lemmas()
elif feat == 'pos':
return self.pos_tags()
else:
raise ValueError(f"'{feat}' is not a valid word feature")
[docs]
def raw_text(self):
if not self._raw_text == None:
return self._raw_text
else:
if not self._tokens == None:
return ' '.join(self._tokens)
elif not self._lemmas == None:
return ' '.join(self._lemmas)
else:
raise Exception('No valid data in Line')
[docs]
def raw_lemma_text(self):
if not self._raw_lemmas == None:
return self._raw_lemmas
return ' '.join(self._lemmas)
[docs]
def raw_pos_text(self):
if not self._raw_pos_text == None:
return self._raw_pos_text
return ' '.join(self._raw_pos_text)
[docs]
def raw_text_by_feature(self, feat = 'token'):
if feat == 'token':
return self.raw_text()
elif feat == 'lemma':
return self.raw_lemma_text()
elif feat == 'pos':
return self.raw_pos_text()
else:
raise ValueError(f"'{feat}' is not a valid word feature")
[docs]
def search(self, search_term : SearchTerm, time = None) -> TargetUsageList:
"""
Searches the line given a search_term.
Args:
search_term : SearchTerm
Returns: A TargetUsageList of all matches.
"""
time = getattr(self, 'date', time)
tul = TargetUsageList()
for feat in search_term.word_feature:
if search_term.regex:
if search_term.search_func:
def search_func(word, line):
offsets = []
rex = re.compile(f'( |^)+{word}( |$)+',re.MULTILINE)
for fi in re.finditer(rex, line):
s = line[fi.start():fi.end()].find(word)
offsets.append([fi.start()+s, fi.start()+s+len(word)])
return offsets
raw_text_by_feature = self.raw_text_by_feature(feat)
for offsets in search_func(search_term.term, raw_text_by_feature):
tu = TargetUsage(self.raw_text(), offsets, time, id=getattr(self, 'id', 0))
tul.append(tu)
else:
token_features = self.tokens_by_feature(feat)
for idx, token in enumerate(token_features):
if search_term.term == token:
offsets = [0,0]
if not idx == 0:
offsets[0] = len(' '.join(self.tokens()[:idx])) + 1
offsets[1] = offsets[0] + len(self.tokens()[idx])
tu = TargetUsage(self.raw_text(), offsets, time, id=getattr(self, 'id', 0))
tul.append(tu)
return tul
def __str__(self):
return self._raw_text
[docs]
class Corpus:
def __init__(self, name, language=None, time=LiteralTime('no time specification'), time_function = None, skip_lines=0, **args):
self.name = name
self.language = language
if time_function is not None and callable(time_function):
self.time = time_function(self)
elif hasattr(self,'extract_dates') and callable(self.extract_dates):
self.time = self.extract_dates()
else:
self.time = time
self.skip_lines = skip_lines
[docs]
def set_sentences_iterator(self, sentences):
self.sentences_iterator = sentences
[docs]
def search(self,
search_terms: List[ str | Pattern | SearchTerm ]
) -> UsageDictionary:
"""
Searches through the corpora by calling Line.search() on all lines.
Args:
search_terms : List[ str | Pattern | SearchTerm ]
If a search term is str or Pattern it is converted
to a SearchTerm and matches tokens only
SearchTerm(word_feature = 'token').
Returns: A UsageDictionary containing all search results for each search term.
"""
usage_dictionary = UsageDictionary()
n_usages = 0
for st in search_terms:
if not isinstance(st, SearchTerm):
st = SearchTerm(st, regex = True if isinstance(st, Pattern) else False)
tul = TargetUsageList()
usage_dictionary[st.term] = tul
for line in self.line_iterator():
match : List[TargetUsage] = line.search(st, time = self.time)
tul.extend(match)
n_usages += len(match)
logging.info(f"{n_usages} usages found.")
return usage_dictionary
[docs]
def tokenize(self, tokenizer = "trankit", split_sentences=False, batch_size=128):
if tokenizer == "trankit":
p = trankit.Pipeline(self.language)
if split_sentences:
def process_lines(texts):
tokenized = p.tokenize(' '.join(texts))
for sentence in tokenized['sentences']:
yield Line(raw_text=sentence['text'], tokens=[token['text'] for token in sentence['tokens']])
texts = []
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
texts.append(text)
if len(texts) == batch_size:
for line in process_lines(texts):
yield line
texts = []
if texts != []:
for line in process_lines(texts):
yield line
else:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
tokenized_sentence = p.tokenize(text, is_sent=True)
line._tokens = [token['text'] for token in tokenized_sentence['tokens']]
yield line
else:
if hasattr(tokenizer, "tokenize") and callable(getattr(tokenizer,"tokenize")):
tokenizer = tokenizer.tokenize
if callable(tokenizer):
try:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
line._tokens = [str(token) for token in tokenizer(text)]
yield line
except Exception:
logging.error(f"Could not use tokenizer {tokenizer} directly as a function to tokenize.")
[docs]
def lemmatize(self, lemmatizer = "trankit", pretokenized = False, tokenize = False, split_sentences = False, batch_size=128):
if lemmatizer == "trankit":
p = trankit.Pipeline(self.language)
# input which is not sentence split
if split_sentences:
def process_texts(texts):
lemmatized = p.lemmatize(' '.join(texts))
lines = []
for sentence in lemmatized['sentences']:
lines.append(Line(raw_text=sentence['text'], lemmas=[token['lemma'] for token in sentence['tokens']], tokens=[token['text'] for token in sentence['tokens']] if tokenize else None))
return lines
texts = []
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
texts.append(text)
if len(texts) == batch_size:
for line in process_texts(texts):
yield line
texts = []
if texts != []:
for line in process_texts(texts):
yield line
# input which is not pretokenized, but each line is its own sentence
elif not pretokenized:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
lemmatized_sentence = p.lemmatize(text, is_sent = True)
line._lemmas = [token['lemma'] for token in lemmatized_sentence['tokens']]
yield line
# pretokenized input, one or more sentences at a time
else:
def modify_lines(lines):
lemmatized = p.lemmatize([line.tokens() for line in lines])
lemmatized_sentences = lemmatized['sentences']
for i, line in enumerate(lines):
line._lemmas = [token['lemma'] for token in lemmatized_sentences[i]['tokens']]
yield line
lines = []
for line in self.line_iterator():
tokens = line.tokens()
if type(tokens) == list and len(tokens) > 0:
lines.append(line)
if len(lines) == batch_size:
for line in modify_lines(lines):
yield line
lines = []
if lines != []:
for line in modify_lines(lines):
yield line
# todo: add other lemmatizers if needed
else:
if hasattr(lemmatizer, "lemmatize") and callable(getattr(lemmatizer,"lemmatize")):
lemmatizer = lemmatizer.lemmatize
if callable(lemmatizer):
try:
if pretokenized:
for line in self.line_iterator():
tokens = line.tokens()
if type(tokens) == list and len(tokens) != 0:
line._lemmas = [str(lemma) for lemma in lemmatizer(tokens)]
yield line
else:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
line._lemmas = [str(lemma) for lemma in lemmatizer(text)]
yield line
except Exception:
logging.error(f"Could not use method {lemmatizer} directly as a function to lemmatize.")
[docs]
def pos_tagging(self, pos_tagger = "trankit", pretokenized = False, tokenize=False, split_sentences = False, batch_size=128):
if pos_tagger == "trankit":
p = trankit.Pipeline(self.language)
# input which is not sentence split
if split_sentences:
def process_texts(texts):
pos_tagged = p.posdep(' '.join(texts))
for sentence in pos_tagged['sentences']:
yield Line(raw_text=sentence['text'], pos_tags=[token['upos'] for token in sentence['tokens']], tokens=[token['text'] for token in sentence['tokens']] if tokenize else None)
texts = []
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
texts.append(text)
if len(texts) == batch_size:
for line in process_texts(texts):
yield line
texts = []
if texts != []:
for line in process_texts(texts):
yield line
# input which is not pretokenized, but each line is its own sentence
elif not pretokenized:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
pos_tagged_sentence = p.posdep(text, is_sent = True)
line._pos_tags = [token['upos'] for token in pos_tagged_sentence['tokens']]
if tokenize:
line._tokens = [token['text'] for token in pos_tagged_sentence['tokens']]
yield line
# pretokenized input, one or more sentences at a time
else:
def modify_lines(lines):
pos_tagged = p.posdep([line.tokens() for line in lines])
pos_tagged_sentences = pos_tagged['sentences']
for i, line in enumerate(lines):
line._pos_tags = [token['upos'] for token in pos_tagged_sentences[i]['tokens']]
yield line
lines = []
for line in self.line_iterator():
tokens = line.tokens()
if type(tokens) == list and len(tokens) > 0:
lines.append(line)
if len(lines) == batch_size:
for line in modify_lines(lines):
yield line
lines = []
if lines != []:
for line in modify_lines(lines):
yield line
else:
if hasattr(pos_tagger, "pos_tag") and callable(getattr(pos_tagger,"pos_tag")):
pos_tagger = pos_tagger.pos_tag
if callable(pos_tagger):
try:
if pretokenized:
for line in self.line_iterator():
tokens = line.tokens()
if type(tokens) == list and len(tokens) > 0:
line._pos_tags = [str(pos_tag) for pos_tag in pos_tagger(tokens)]
yield line
else:
for line in self.line_iterator():
text = line.raw_text()
if type(text) == str and len(text.strip()) > 0:
line._pos_tags = [str(pos_tag) for pos_tag in pos_tagger(text)]
yield line
except Exception:
logging.error(f"Could not use method {pos_tagger} directly as a function to perform POS tagging.")
# preliminary function
[docs]
def segment_sentences(self, segmentizer = "trankit", batch_size=128):
if segmentizer == "trankit":
p = trankit.Pipeline(self.language)
lines = []
for line in self.line_iterator():
lines.append(line.raw_text())
if len(lines) == batch_size:
sentences = p.ssplit(' '.join(lines))
for sent in sentences['sentences']:
yield Line(sent['text'])
lines = []
if len(lines) != 0:
sentences = p.ssplit(' '.join(lines))
for sent in sentences['sentences']:
yield Line(sent['text'])
elif callable(segmentizer):
try:
lines = []
for line in self.line_iterator():
lines.append(line.raw_text())
if len(lines) == batch_size:
sentences = segmentizer(' '.join(lines))
for sent in sentences:
yield Line(sent)
lines = []
if len(lines) != 0:
sentences = segmentizer(' '.join(lines))
for sent in sentences:
yield Line(sent)
except:
logging.info(f"ERROR: Could not use method {segmentizer} directly as a function to split sentences.")
[docs]
def folder_iterator(self, path):
fnames = []
for fname in os.listdir(path):
if os.path.isdir(os.path.join(path,fname)):
fnames = fnames + self.folder_iterator(os.path.join(path,fname))
else:
fnames.append(os.path.join(path,fname))
return fnames
[docs]
def cast_to_vertical(corpora, vertical_corpus):
line_iterators = [corpus.line_iterator() for corpus in corpora]
iterate = True
with open(vertical_corpus.path,'w+') as f:
while iterate:
lines = []
for iterator in line_iterators:
next_line = next(iterator)
if not next_line == None:
vertical_lines = []
for j in range(len(lines[0])):
vertical_lines.append('{vertical_corpus.field_separator}'.join([lines[i][j] for i in range(len(lines))]))
for line in vertical_lines:
f.write(line+'\n')
f.write(vertical_corpus.sentence_separator)
else:
iterate = False
[docs]
def save(self):
lc = LanguageChange()
lc.save_resource('corpus',f'{self.language} corpora',self.name)
[docs]
def save_tokenized_corpora(corpora : Union[Self, List[Self]], tokens = True, lemmas = False, pos = False, save_format = 'linebyline', file_specification = None, file_ending = ".txt", tokenizer="trankit", lemmatizer="trankit", pos_tagger="trankit", split_sentences = True, batch_size=128):
if not type(corpora) is list:
corpora = [corpora]
if file_specification == None:
file_specification = ""
file_specification += "-tokens" if tokens else ''
file_specification += '-lemmas' if lemmas else ''
file_specification += '-pos' if pos else ''
for corpus in corpora:
tokenized_name = os.path.splitext(corpus.path)[0]+file_specification+file_ending
with open(tokenized_name, 'w+') as f: # cache is probably needed here because the file might already exist.
if save_format == 'linebyline':
if tokens:
for line in corpus.tokenize(tokenizer, split_sentences=split_sentences, batch_size=batch_size):
f.write(' '.join(line.tokens())+'\n')
elif lemmas:
for line in corpus.lemmatize(lemmatizer, split_sentences=split_sentences, batch_size=batch_size):
f.write(' '.join(line.lemmas())+'\n')
elif pos:
for line in corpus.pos_tagging(pos_tagger,split_sentences=split_sentences, batch_size=batch_size):
f.write(' '.join(line.pos_tags())+'\n')
elif save_format == 'vertical':
def write_vertical_line(fields):
fields = [f for f in fields if f is not None]
for tup in zip(*fields):
f.write('\t'.join(tup) + '\n')
f.write('\n')
if lemmas:
if pos:
# tokens_lemmas_pos (with or without tokens)
for line in corpus.tokens_lemmas_pos_tags(tokenizer, tokens=tokens,split_sentences=split_sentences, batch_size=batch_size):
write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()])
else:
# lemmatize (with or without tokens)
for line in corpus.lemmatize(lemmatizer, tokenize=tokens,split_sentences=split_sentences, batch_size=batch_size):
write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()])
elif pos:
# pos_tagging (with or without tokens)
for line in corpus.pos_tagging(pos_tagger, tokenize=tokens, split_sentences=split_sentences, batch_size=batch_size):
write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()])
elif tokens:
# tokenize only
for line in corpus.tokenize(tokenizer,split_sentences=split_sentences, batch_size=batch_size):
write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()])
[docs]
class LinebyLineCorpus(Corpus):
def __init__(self, path, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = path
super().__init__(**kwargs)
self.path = path
if 'is_sentence_tokenized' in kwargs:
self.is_sentence_tokenized = kwargs['is_sentence_tokenized']
else:
self.is_sentence_tokenized = False
if self.is_sentence_tokenized:
if 'is_tokenized' in kwargs:
self.is_tokenized = kwargs['is_tokenized']
else:
if 'is_tokenized' in kwargs and kwargs['is_tokenized']:
self.is_sentence_tokenized = True
self.is_tokenized = True
else:
self.is_sentence_tokenized = False
self.is_tokenized = False
if 'is_tokenized' in kwargs and kwargs['is_tokenized']:
if 'is_lemmatized' in kwargs:
self.is_lemmatized = kwargs['is_lemmatized']
if 'tokens_splitter' in kwargs:
self.tokens_splitter = kwargs.tokens_splitter
else:
self.tokens_splitter = ' '
else:
if 'is_lemmatized' in kwargs and kwargs['is_lemmatized']:
self.is_sentence_tokenized = True
self.is_tokenized = True
self.is_lemmatized = True
if 'tokens_splitter' in kwargs:
self.tokens_splitter = kwargs.tokens_splitter
else:
self.tokens_splitter = ' '
else:
self.is_lemmatized = False
[docs]
def line_iterator(self):
if os.path.isdir(self.path):
fnames = self.folder_iterator(self.path)
else:
fnames = [self.path]
def get_data(line):
line = line.replace('\n','')
data = {}
data['raw_text'] = line
if self.is_lemmatized:
data['lemmas'] = line.split(self.tokens_splitter)
elif self.is_tokenized:
data['tokens'] = line.split(self.tokens_splitter)
return data
for fname in fnames:
if fname.endswith('.txt'):
with open(fname,'r') as f:
for i, line in enumerate(f):
if i >= self.skip_lines:
data = get_data(line)
yield Line(fname=fname, **data)
elif fname.endswith('.gz'):
with gzip.open(fname, mode="rt") as f:
for i, line in enumerate(f):
if i >= self.skip_lines:
data = get_data(line)
yield Line(fname=fname, **data)
else:
raise Exception('Format not recognized')
[docs]
class VerticalCorpus(Corpus):
def __init__(self, path, sentence_separator='\n', field_separator='\t', field_map={'token':0, 'lemma':1, 'pos_tag':2}, **args):
super().__init__(name=path,**args)
self.path = path
self.sentence_separator = sentence_separator
self.field_separator = field_separator
self.field_map = field_map
[docs]
def line_iterator(self):
if os.path.isdir(self.path):
fnames = self.folder_iterator(self.path)
else:
fnames = [self.path]
def get_data(line):
data = {}
splitted_line = [vertical_line.strip('\n').split(self.field_separator) for vertical_line in line]
raw_text = [vertical_line[self.field_map['token']] for vertical_line in splitted_line]
data['raw_text'] = ' '.join(raw_text)
data['tokens'] = raw_text
if 'lemma' in self.field_map:
lemma_text = [vertical_line[self.field_map['lemma']] for vertical_line in splitted_line]
data['lemmas'] = lemma_text
if 'pos_tag' in self.field_map:
pos_text = [vertical_line[self.field_map['pos_tag']] for vertical_line in splitted_line]
data['pos_tags'] = pos_text
return data
for fname in fnames:
if fname.endswith('.txt'):
with open(fname,'r') as f:
line = []
for i, vertical_line in enumerate(f):
if i >= self.skip_lines:
if vertical_line == self.sentence_separator:
data = get_data(line)
yield Line(fname=fname, **data)
line = []
else:
line.append(vertical_line)
elif fname.endswith('.gz'):
with gzip.open(fname, mode="rt") as f:
for i, vertical_line in enumerate(f):
if i >= self.skip_lines:
if vertical_line == self.sentence_separator:
data = get_data(line)
yield Line(fname=fname, **data)
line = []
else:
line.append(vertical_line)
else:
raise Exception('Format not recognized')
# Should be able to load and parse a corpus in XML format.
# Supports only tokenized corpora so far.
[docs]
class XMLCorpus(Corpus):
def __init__(self, path, sentence_tag='sentence', token_tag='token', is_lemmatized=False, lemma_tag=None, is_pos_tagged=False, pos_tag_tag=None, text_tag='text', **args):
if not 'name' in args:
name = path
super().__init__(name, **args)
self.path = path
if lemma_tag:
self.lemma_tag = lemma_tag
else:
self.lemma_tag = ''
if is_lemmatized:
self.is_lemmatized = True
if lemma_tag != '':
self.lemma_tag = lemma_tag
else:
self.lemma_tag = 'lemma'
else:
self.is_lemmatized = False
self.lemma_tag = ''
if pos_tag_tag:
self.pos_tag_tag = pos_tag_tag
else:
self.pos_tag_tag = ''
if is_pos_tagged:
self.is_pos_tagged = True
if pos_tag_tag != '':
self.pos_tag_tag = pos_tag_tag
else:
self.pos_tag_tag = 'pos'
else:
self.is_pos_tagged = False
self.pos_tag_tag = ''
self.sentence_tag = sentence_tag
self.token_tag = token_tag
self.text_tag = text_tag
[docs]
def get_attribute(self, tag, attribute):
return tag.attrib[attribute]
[docs]
def line_iterator(self):
if os.path.isdir(self.path):
fnames = self.folder_iterator(self.path)
else:
fnames = [self.path]
def get_data(tokens, lemmas = [], pos_tags = []):
data = {}
data['raw_text'] = ' '.join(tokens)
if self.is_lemmatized and lemmas != []:
data['lemmas'] = lemmas
if self.is_pos_tagged and pos_tags != []:
data['pos_tags'] = pos_tags
data['tokens'] = tokens
return data
def read_xml(source):
tokens = []
lemmas = []
parser = ET.iterparse(source, events=('start','end'))
sentence_counter = 0
for event, elem in parser:
if elem.sourceline >= self.skip_lines:
if elem.tag == self.text_tag:
date = elem.get('date')
if elem.tag == self.sentence_tag:
if event == 'start':
tokens = []
lemmas = []
pos_tags = []
# If the sentence has ended, create a new Line object with its content
elif event == 'end':
if tokens != []:
data = get_data(tokens, lemmas, pos_tags)
data['date'] = date
line_id = elem.get('id', sentence_counter)
data['id'] = line_id
yield Line(fname=fname, **data)
elem.clear()
sentence_counter += 1
elif elem.tag == self.token_tag:
if event == 'end':
if self.is_lemmatized:
lemma = self.get_attribute(elem, self.lemma_tag)
lemmas.append(lemma)
if self.is_pos_tagged:
pos_tag = self.get_attribute(elem, self.pos_tag_tag)
pos_tags.append(pos_tag)
token = elem.text
tokens.append(token)
elem.clear()
else:
if event == 'end':
elem.clear()
for fname in fnames:
if fname.endswith('.xml'):
for l in read_xml(fname):
yield l
elif fname.endswith('.xml.bz2'):
with bz2.open(fname, 'r') as f:
for l in read_xml(f):
yield l
else:
raise Exception('Format not recognized')
# Cast to a LineByLine corpus and save the result in the path specified in there
[docs]
def cast_to_linebyline(self, linebyline_corpus : LinebyLineCorpus):
savepath = linebyline_corpus.path
if hasattr(linebyline_corpus, 'tokens_splitter'):
tokens_splitter = linebyline_corpus.tokens_splitter
else:
tokens_splitter = ' '
tokenized = linebyline_corpus.is_tokenized
lemmatized = linebyline_corpus.is_lemmatized
if lemmatized and not self.is_lemmatized:
logging.info('ERROR: cannot cast to lemmatized LinebyLineCorpus because this XMLCorpus is not lemmatized.')
return None
with open(savepath, 'w+') as f:
if lemmatized:
for line in self.line_iterator():
f.write(tokens_splitter.join(line.lemmas())+'\n') # cache needed here
elif tokenized:
for line in self.line_iterator():
f.write(tokens_splitter.join(line.tokens())+'\n') # cache needed here
else:
for line in self.line_iterator():
f.write(line.raw_text()+'\n') # cache needed here
[docs]
def cast_to_vertical(self, vertical_corpus : VerticalCorpus):
savepath = vertical_corpus.path
field_separator = vertical_corpus.field_separator
sentence_separator = vertical_corpus.sentence_separator
# We need to make sure that the line features (token, lemma, pos, etc.) come in the same order as in the field_map in the vertical_corpus
sorted_field_names = [key for (key, _) in sorted(vertical_corpus.field_map.items(), key = lambda x : x[1])]
def get_line_feature(line, key):
field_name_to_line_feature = {'token': line.tokens, 'lemma': line.lemmas, 'pos_tag': line.pos_tags}
return field_name_to_line_feature[key]()
with open(savepath,'w+') as f:
for line in self.line_iterator():
for t in zip(*(get_line_feature(line, key) for key in sorted_field_names)):
f.write(field_separator.join(list(t))+'\n') # cache needed here
f.write(sentence_separator) # cache needed here
# A class for handling XML corpora specifically from spraakbanken.gu.se
[docs]
class SprakBankenCorpus(XMLCorpus):
def __init__(self, path, sentence_tag='sentence',token_tag='token', is_lemmatized=True, lemma_tag='lemma', is_pos_tagged=True, pos_tag_tag='pos', **args):
super().__init__(path, sentence_tag, token_tag, is_lemmatized, lemma_tag, is_pos_tagged, pos_tag_tag, **args)
[docs]
def get_attribute(self, tag, attribute):
content = tag.attrib[attribute]
if content != None:
if attribute == self.lemma_tag:
content = content.strip("|").split("|")
if content != ['']:
return content[0]
else:
return content
return tag.text
[docs]
class HistoricalCorpus(SortedKeyList):
def __new__(cls, *args, **kwargs):
"""Ensures only valid arguments go to SortedKeyList"""
return super().__new__(cls)
def __init__(self, corpora:Union[List[Corpus],str], key=lambda c : c.time, corpus_type=None, time_function=None):
"""
This class is a SortedKeyList of corpora. A historical corpus can be initialised either from a path where the files are located, or from a list of already instanciated Corpus objects.
Args:
corpora ([Corpus]|str): a list of corpora or a path where the corpora are stored.
key (function, default = lambda c : c.time): the key by which the corpora are sorted. Default sorting is by time, in ascending order
corpus_type (str, default=None): the kind of corpus. Needs to be provided if initalising from a folder, and then needs to be one of 'line_by_line','vertical','xml', and 'sprakbanken'.
time_function (function, default = None): the function used to extract a time value for each corpus. Needed if initialising from a folder.
"""
if isinstance(corpora, str):
try:
if corpus_type not in ['line_by_line','vertical','xml','sprakbanken']:
logging.error("When initialising from a folder path, corpus_type must be one of 'line_by_line','vertical','xml' and 'sprakbanken'.")
raise ValueError
corpora_list = []
for file in os.listdir(corpora):
try:
if corpus_type == 'line_by_line':
corpus = LinebyLineCorpus(os.path.join(corpora,file),time_function=time_function)
elif corpus_type == 'vertical':
corpus = VerticalCorpus(os.path.join(corpora,file),time_function=time_function)
elif corpus_type == 'xml':
corpus = XMLCorpus(os.path.join(corpora,file),time_function=time_function)
elif corpus_type == 'sprakbanken':
corpus = SprakBankenCorpus(os.path.join(corpora,file),time_function=time_function)
corpora_list.append(corpus)
except: #TODO: proper exception
logging.error(f"Could not initialise a corpus from path {os.path.join(dir,file)}.")
continue
corpora = corpora_list
except:
logging.error(f"Could not use path {corpora} to intitialize corpora.")
raise Exception
elif isinstance(corpora, list):
for corpus in corpora:
if not isinstance(corpus, Corpus):
logging.error("Every element in 'corpora' needs to be a Corpus object.")
raise Exception
else:
logging.error("'corpora' needs to be either a string or a list of Corpus objects.")
raise Exception
super().__init__(corpora, key)
[docs]
def line_iterator(self):
"""
Iterates through all of the corpora, and yields all of the lines that are possible to get.
"""
for corpus in self:
try:
for line in corpus.line_iterator():
yield line
except:
logging.error(f"Could not get lines from {corpus.name}.")
[docs]
def search(self, search_terms : List[ str | Pattern | SearchTerm ], index_by_corpus=False):
"""
Searches through all of the corpora by calling search() for each of them.
Args:
search_terms : List[ str | Pattern | SearchTerm ]
If search term is str or Pattern it is converted
to a SearchTerm and matches tokens only
SearchTerm(word_feature = 'token').
index_by_corpus : bool, default False
decides whether the usages for a given word should be a dictionary,
with keys as the corpus names and values as lists of usages, or a list
of all usages across corpora.
Returns: a dictionary containing all search results from the included corpora.
"""
if index_by_corpus:
usages = {} #TODO: make this saveable
for corpus in self:
try:
usage_dict : UsageDictionary = corpus.search(search_terms)
except:
logging.error(f"Could not search through {corpus.name}.")
continue
for key in usage_dict:
if not key in usages:
usages[key] = {corpus.name : TargetUsageList()}
usages[key][corpus.name] = usage_dict[key]
else:
usages = UsageDictionary()
for corpus in self:
try:
usage_dict : UsageDictionary = corpus.search(search_terms)
except:
logging.error(f"Could not search through {corpus.name}.")
continue
for key in usage_dict:
if not key in usages:
usages[key] = TargetUsageList()
usages[key].extend(usage_dict[key])
return usages