Source code for languagechange.corpora

"""Corpus utilities for line-level corpora and search helpers."""

import bz2
import gzip
import logging
import os
import re
from typing import List, Pattern, Self, Union
import lxml.etree as ET
from sortedcontainers import SortedKeyList
import trankit
from languagechange.resource_manager import LanguageChange
from languagechange.search import SearchTerm
from languagechange.usages import TargetUsage, TargetUsageList, UsageDictionary
from languagechange.utils import LiteralTime

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)


[docs] class Line: """Wraps a corpus line with token, lemma, and POS metadata.""" def __init__(self, raw_text=None, tokens=None, lemmas=None, pos_tags=None, fname=None, raw_lemma_text=None, raw_pos_text = None, **kwargs, ): self._raw_text = raw_text self._raw_lemma_text = raw_lemma_text self._raw_pos_text = raw_pos_text self._tokens = tokens self._lemmas = lemmas self._pos_tags = pos_tags self._fname = fname self.__dict__.update(kwargs)
[docs] def tokens(self): if not self._tokens == None: return self._tokens else: return self._lemmas
[docs] def lemmas(self): return self._lemmas
[docs] def pos_tags(self): return self._pos_tags
[docs] def tokens_by_feature(self, feat = str): if feat == 'token': return self.tokens() elif feat == 'lemma': return self.lemmas() elif feat == 'pos': return self.pos_tags() else: raise ValueError(f"'{feat}' is not a valid word feature")
[docs] def raw_text(self): if not self._raw_text == None: return self._raw_text else: if not self._tokens == None: return ' '.join(self._tokens) elif not self._lemmas == None: return ' '.join(self._lemmas) else: raise Exception('No valid data in Line')
[docs] def raw_lemma_text(self): if not self._raw_lemmas == None: return self._raw_lemmas return ' '.join(self._lemmas)
[docs] def raw_pos_text(self): if not self._raw_pos_text == None: return self._raw_pos_text return ' '.join(self._raw_pos_text)
[docs] def raw_text_by_feature(self, feat = 'token'): if feat == 'token': return self.raw_text() elif feat == 'lemma': return self.raw_lemma_text() elif feat == 'pos': return self.raw_pos_text() else: raise ValueError(f"'{feat}' is not a valid word feature")
[docs] def search(self, search_term : SearchTerm, time = None) -> TargetUsageList: """ Searches the line given a search_term. Args: search_term : SearchTerm Returns: A TargetUsageList of all matches. """ time = getattr(self, 'date', time) tul = TargetUsageList() for feat in search_term.word_feature: if search_term.regex: if search_term.search_func: def search_func(word, line): offsets = [] rex = re.compile(f'( |^)+{word}( |$)+',re.MULTILINE) for fi in re.finditer(rex, line): s = line[fi.start():fi.end()].find(word) offsets.append([fi.start()+s, fi.start()+s+len(word)]) return offsets raw_text_by_feature = self.raw_text_by_feature(feat) for offsets in search_func(search_term.term, raw_text_by_feature): tu = TargetUsage(self.raw_text(), offsets, time, id=getattr(self, 'id', 0)) tul.append(tu) else: token_features = self.tokens_by_feature(feat) for idx, token in enumerate(token_features): if search_term.term == token: offsets = [0,0] if not idx == 0: offsets[0] = len(' '.join(self.tokens()[:idx])) + 1 offsets[1] = offsets[0] + len(self.tokens()[idx]) tu = TargetUsage(self.raw_text(), offsets, time, id=getattr(self, 'id', 0)) tul.append(tu) return tul
def __str__(self): return self._raw_text
[docs] class Corpus: """Base interface for corpora that support search and tokenization.""" def __init__(self, name, language=None, time=LiteralTime('no time specification'), time_function = None, skip_lines=0, **args): self.name = name self.language = language if time_function is not None and callable(time_function): self.time = time_function(self) elif hasattr(self,'extract_dates') and callable(self.extract_dates): self.time = self.extract_dates() else: self.time = time self.skip_lines = skip_lines
[docs] def set_sentences_iterator(self, sentences): self.sentences_iterator = sentences
[docs] def search(self, search_terms: List[ str | Pattern | SearchTerm ] ) -> UsageDictionary: """ Searches through the corpora by calling Line.search() on all lines. Args: search_terms : List[ str | Pattern | SearchTerm ] If a search term is str or Pattern it is converted to a SearchTerm and matches tokens only SearchTerm(word_feature = 'token'). Returns: A UsageDictionary containing all search results for each search term. """ usage_dictionary = UsageDictionary() n_usages = 0 for st in search_terms: if not isinstance(st, SearchTerm): st = SearchTerm(st, regex = True if isinstance(st, Pattern) else False) tul = TargetUsageList() usage_dictionary[st.term] = tul for line in self.line_iterator(): match : List[TargetUsage] = line.search(st, time = self.time) tul.extend(match) n_usages += len(match) logging.info(f"{n_usages} usages found.") return usage_dictionary
[docs] def tokenize(self, tokenizer = "trankit", split_sentences=False, batch_size=128): """Yield tokenized sentences using Trankit, optionally splitting sentences. Args: tokenizer (str, optional): Tokenizer backend. Defaults to "trankit". split_sentences (bool, optional): Split paragraphs into sentences. Defaults to False. batch_size (int, optional): Number of lines to accumulate before processing. Defaults to 128. """ if tokenizer == "trankit": p = trankit.Pipeline(self.language) if split_sentences: def process_lines(texts): tokenized = p.tokenize(' '.join(texts)) for sentence in tokenized['sentences']: yield Line(raw_text=sentence['text'], tokens=[token['text'] for token in sentence['tokens']]) texts = [] for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: texts.append(text) if len(texts) == batch_size: for line in process_lines(texts): yield line texts = [] if texts != []: for line in process_lines(texts): yield line else: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: tokenized_sentence = p.tokenize(text, is_sent=True) line._tokens = [token['text'] for token in tokenized_sentence['tokens']] yield line else: if hasattr(tokenizer, "tokenize") and callable(getattr(tokenizer,"tokenize")): tokenizer = tokenizer.tokenize if callable(tokenizer): try: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: line._tokens = [str(token) for token in tokenizer(text)] yield line except Exception: logging.error(f"Could not use tokenizer {tokenizer} directly as a function to tokenize.")
[docs] def lemmatize(self, lemmatizer = "trankit", pretokenized = False, tokenize = False, split_sentences = False, batch_size=128): if lemmatizer == "trankit": p = trankit.Pipeline(self.language) # input which is not sentence split if split_sentences: def process_texts(texts): lemmatized = p.lemmatize(' '.join(texts)) lines = [] for sentence in lemmatized['sentences']: lines.append(Line(raw_text=sentence['text'], lemmas=[token['lemma'] for token in sentence['tokens']], tokens=[token['text'] for token in sentence['tokens']] if tokenize else None)) return lines texts = [] for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: texts.append(text) if len(texts) == batch_size: for line in process_texts(texts): yield line texts = [] if texts != []: for line in process_texts(texts): yield line # input which is not pretokenized, but each line is its own sentence elif not pretokenized: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: lemmatized_sentence = p.lemmatize(text, is_sent = True) line._lemmas = [token['lemma'] for token in lemmatized_sentence['tokens']] yield line # pretokenized input, one or more sentences at a time else: def modify_lines(lines): lemmatized = p.lemmatize([line.tokens() for line in lines]) lemmatized_sentences = lemmatized['sentences'] for i, line in enumerate(lines): line._lemmas = [token['lemma'] for token in lemmatized_sentences[i]['tokens']] yield line lines = [] for line in self.line_iterator(): tokens = line.tokens() if type(tokens) == list and len(tokens) > 0: lines.append(line) if len(lines) == batch_size: for line in modify_lines(lines): yield line lines = [] if lines != []: for line in modify_lines(lines): yield line # todo: add other lemmatizers if needed else: if hasattr(lemmatizer, "lemmatize") and callable(getattr(lemmatizer,"lemmatize")): lemmatizer = lemmatizer.lemmatize if callable(lemmatizer): try: if pretokenized: for line in self.line_iterator(): tokens = line.tokens() if type(tokens) == list and len(tokens) != 0: line._lemmas = [str(lemma) for lemma in lemmatizer(tokens)] yield line else: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: line._lemmas = [str(lemma) for lemma in lemmatizer(text)] yield line except Exception: logging.error(f"Could not use method {lemmatizer} directly as a function to lemmatize.")
[docs] def pos_tagging(self, pos_tagger = "trankit", pretokenized = False, tokenize=False, split_sentences = False, batch_size=128): if pos_tagger == "trankit": p = trankit.Pipeline(self.language) # input which is not sentence split if split_sentences: def process_texts(texts): pos_tagged = p.posdep(' '.join(texts)) for sentence in pos_tagged['sentences']: yield Line(raw_text=sentence['text'], pos_tags=[token['upos'] for token in sentence['tokens']], tokens=[token['text'] for token in sentence['tokens']] if tokenize else None) texts = [] for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: texts.append(text) if len(texts) == batch_size: for line in process_texts(texts): yield line texts = [] if texts != []: for line in process_texts(texts): yield line # input which is not pretokenized, but each line is its own sentence elif not pretokenized: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: pos_tagged_sentence = p.posdep(text, is_sent = True) line._pos_tags = [token['upos'] for token in pos_tagged_sentence['tokens']] if tokenize: line._tokens = [token['text'] for token in pos_tagged_sentence['tokens']] yield line # pretokenized input, one or more sentences at a time else: def modify_lines(lines): pos_tagged = p.posdep([line.tokens() for line in lines]) pos_tagged_sentences = pos_tagged['sentences'] for i, line in enumerate(lines): line._pos_tags = [token['upos'] for token in pos_tagged_sentences[i]['tokens']] yield line lines = [] for line in self.line_iterator(): tokens = line.tokens() if type(tokens) == list and len(tokens) > 0: lines.append(line) if len(lines) == batch_size: for line in modify_lines(lines): yield line lines = [] if lines != []: for line in modify_lines(lines): yield line else: if hasattr(pos_tagger, "pos_tag") and callable(getattr(pos_tagger,"pos_tag")): pos_tagger = pos_tagger.pos_tag if callable(pos_tagger): try: if pretokenized: for line in self.line_iterator(): tokens = line.tokens() if type(tokens) == list and len(tokens) > 0: line._pos_tags = [str(pos_tag) for pos_tag in pos_tagger(tokens)] yield line else: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: line._pos_tags = [str(pos_tag) for pos_tag in pos_tagger(text)] yield line except Exception: logging.error(f"Could not use method {pos_tagger} directly as a function to perform POS tagging.")
[docs] def tokens_lemmas_pos_tags(self, nlp_model="trankit", tokens=True, split_sentences = False, batch_size=128): if nlp_model == "trankit": p = trankit.Pipeline(self.language) if not split_sentences: for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: lemmatized_sentence = p.lemmatize(text, is_sent = True) line._lemmas = [token['lemma'] for token in lemmatized_sentence['tokens']] if tokens: line._tokens = [token['text'] for token in lemmatized_sentence['tokens']] pos_tagged = p.posdep(line.tokens(), is_sent=True) else: pos_tagged = p.posdep(line.raw_text(), is_sent=True) line._pos_tags = [token['upos'] for token in pos_tagged['tokens']] yield line else: def process_texts(texts): lemmatized_sentences = p.lemmatize(' '.join(texts)) tokens = [] for sentence in lemmatized_sentences['sentences']: tokens.append([token['text'] for token in sentence['tokens']]) pos_tagged_sentences = p.posdep(tokens) for i, sentence in enumerate(lemmatized_sentences['sentences']): yield Line(raw_text=sentence['text'], tokens=[token['text'] for token in sentence['tokens']] if tokens else None, lemmas=[token['lemma'] for token in sentence['tokens']],pos_tags=[token['upos'] for token in pos_tagged_sentences['sentences'][i]['tokens']]) texts = [] for line in self.line_iterator(): text = line.raw_text() if type(text) == str and len(text.strip()) > 0: texts.append(text) if len(texts) == batch_size: for line in process_texts(texts): yield line texts = [] if len(texts) != 0: for line in process_texts(texts): yield line
# preliminary function
[docs] def segment_sentences(self, segmentizer = "trankit", batch_size=128): if segmentizer == "trankit": p = trankit.Pipeline(self.language) lines = [] for line in self.line_iterator(): lines.append(line.raw_text()) if len(lines) == batch_size: sentences = p.ssplit(' '.join(lines)) for sent in sentences['sentences']: yield Line(sent['text']) lines = [] if len(lines) != 0: sentences = p.ssplit(' '.join(lines)) for sent in sentences['sentences']: yield Line(sent['text']) elif callable(segmentizer): try: lines = [] for line in self.line_iterator(): lines.append(line.raw_text()) if len(lines) == batch_size: sentences = segmentizer(' '.join(lines)) for sent in sentences: yield Line(sent) lines = [] if len(lines) != 0: sentences = segmentizer(' '.join(lines)) for sent in sentences: yield Line(sent) except: logging.info(f"ERROR: Could not use method {segmentizer} directly as a function to split sentences.")
[docs] def folder_iterator(self, path): fnames = [] for fname in os.listdir(path): if os.path.isdir(os.path.join(path,fname)): fnames = fnames + self.folder_iterator(os.path.join(path,fname)) else: fnames.append(os.path.join(path,fname)) return fnames
[docs] def cast_to_vertical(corpora, vertical_corpus): line_iterators = [corpus.line_iterator() for corpus in corpora] iterate = True with open(vertical_corpus.path,'w+') as f: while iterate: lines = [] for iterator in line_iterators: next_line = next(iterator) if not next_line == None: vertical_lines = [] for j in range(len(lines[0])): vertical_lines.append('{vertical_corpus.field_separator}'.join([lines[i][j] for i in range(len(lines))])) for line in vertical_lines: f.write(line+'\n') f.write(vertical_corpus.sentence_separator) else: iterate = False
[docs] def save(self): lc = LanguageChange() lc.save_resource('corpus',f'{self.language} corpora',self.name)
[docs] def save_tokenized_corpora(corpora : Union[Self, List[Self]], tokens = True, lemmas = False, pos = False, save_format = 'linebyline', file_specification = None, file_ending = ".txt", tokenizer="trankit", lemmatizer="trankit", pos_tagger="trankit", split_sentences = True, batch_size=128): if not type(corpora) is list: corpora = [corpora] if file_specification == None: file_specification = "" file_specification += "-tokens" if tokens else '' file_specification += '-lemmas' if lemmas else '' file_specification += '-pos' if pos else '' for corpus in corpora: tokenized_name = os.path.splitext(corpus.path)[0]+file_specification+file_ending with open(tokenized_name, 'w+') as f: # cache is probably needed here because the file might already exist. if save_format == 'linebyline': if tokens: for line in corpus.tokenize(tokenizer, split_sentences=split_sentences, batch_size=batch_size): f.write(' '.join(line.tokens())+'\n') elif lemmas: for line in corpus.lemmatize(lemmatizer, split_sentences=split_sentences, batch_size=batch_size): f.write(' '.join(line.lemmas())+'\n') elif pos: for line in corpus.pos_tagging(pos_tagger,split_sentences=split_sentences, batch_size=batch_size): f.write(' '.join(line.pos_tags())+'\n') elif save_format == 'vertical': def write_vertical_line(fields): fields = [f for f in fields if f is not None] for tup in zip(*fields): f.write('\t'.join(tup) + '\n') f.write('\n') if lemmas: if pos: # tokens_lemmas_pos (with or without tokens) for line in corpus.tokens_lemmas_pos_tags(tokenizer, tokens=tokens,split_sentences=split_sentences, batch_size=batch_size): write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()]) else: # lemmatize (with or without tokens) for line in corpus.lemmatize(lemmatizer, tokenize=tokens,split_sentences=split_sentences, batch_size=batch_size): write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()]) elif pos: # pos_tagging (with or without tokens) for line in corpus.pos_tagging(pos_tagger, tokenize=tokens, split_sentences=split_sentences, batch_size=batch_size): write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()]) elif tokens: # tokenize only for line in corpus.tokenize(tokenizer,split_sentences=split_sentences, batch_size=batch_size): write_vertical_line([line.tokens(), line.lemmas(), line.pos_tags()])
[docs] class LinebyLineCorpus(Corpus): def __init__(self, path, **kwargs): if 'name' not in kwargs: kwargs['name'] = path super().__init__(**kwargs) self.path = path if 'is_sentence_tokenized' in kwargs: self.is_sentence_tokenized = kwargs['is_sentence_tokenized'] else: self.is_sentence_tokenized = False if self.is_sentence_tokenized: if 'is_tokenized' in kwargs: self.is_tokenized = kwargs['is_tokenized'] else: if 'is_tokenized' in kwargs and kwargs['is_tokenized']: self.is_sentence_tokenized = True self.is_tokenized = True else: self.is_sentence_tokenized = False self.is_tokenized = False if 'is_tokenized' in kwargs and kwargs['is_tokenized']: if 'is_lemmatized' in kwargs: self.is_lemmatized = kwargs['is_lemmatized'] if 'tokens_splitter' in kwargs: self.tokens_splitter = kwargs.tokens_splitter else: self.tokens_splitter = ' ' else: if 'is_lemmatized' in kwargs and kwargs['is_lemmatized']: self.is_sentence_tokenized = True self.is_tokenized = True self.is_lemmatized = True if 'tokens_splitter' in kwargs: self.tokens_splitter = kwargs.tokens_splitter else: self.tokens_splitter = ' ' else: self.is_lemmatized = False
[docs] def line_iterator(self): if os.path.isdir(self.path): fnames = self.folder_iterator(self.path) else: fnames = [self.path] def get_data(line): line = line.replace('\n','') data = {} data['raw_text'] = line if self.is_lemmatized: data['lemmas'] = line.split(self.tokens_splitter) elif self.is_tokenized: data['tokens'] = line.split(self.tokens_splitter) return data for fname in fnames: if fname.endswith('.txt'): with open(fname,'r') as f: for i, line in enumerate(f): if i >= self.skip_lines: data = get_data(line) yield Line(fname=fname, **data) elif fname.endswith('.gz'): with gzip.open(fname, mode="rt") as f: for i, line in enumerate(f): if i >= self.skip_lines: data = get_data(line) yield Line(fname=fname, **data) else: raise Exception('Format not recognized')
[docs] class VerticalCorpus(Corpus): def __init__(self, path, sentence_separator='\n', field_separator='\t', field_map={'token':0, 'lemma':1, 'pos_tag':2}, **args): super().__init__(name=path,**args) self.path = path self.sentence_separator = sentence_separator self.field_separator = field_separator self.field_map = field_map
[docs] def line_iterator(self): if os.path.isdir(self.path): fnames = self.folder_iterator(self.path) else: fnames = [self.path] def get_data(line): data = {} splitted_line = [vertical_line.strip('\n').split(self.field_separator) for vertical_line in line] raw_text = [vertical_line[self.field_map['token']] for vertical_line in splitted_line] data['raw_text'] = ' '.join(raw_text) data['tokens'] = raw_text if 'lemma' in self.field_map: lemma_text = [vertical_line[self.field_map['lemma']] for vertical_line in splitted_line] data['lemmas'] = lemma_text if 'pos_tag' in self.field_map: pos_text = [vertical_line[self.field_map['pos_tag']] for vertical_line in splitted_line] data['pos_tags'] = pos_text return data for fname in fnames: if fname.endswith('.txt'): with open(fname,'r') as f: line = [] for i, vertical_line in enumerate(f): if i >= self.skip_lines: if vertical_line == self.sentence_separator: data = get_data(line) yield Line(fname=fname, **data) line = [] else: line.append(vertical_line) elif fname.endswith('.gz'): with gzip.open(fname, mode="rt") as f: for i, vertical_line in enumerate(f): if i >= self.skip_lines: if vertical_line == self.sentence_separator: data = get_data(line) yield Line(fname=fname, **data) line = [] else: line.append(vertical_line) else: raise Exception('Format not recognized')
# Should be able to load and parse a corpus in XML format. # Supports only tokenized corpora so far.
[docs] class XMLCorpus(Corpus): def __init__(self, path, sentence_tag='sentence', token_tag='token', is_lemmatized=False, lemma_tag=None, is_pos_tagged=False, pos_tag_tag=None, text_tag='text', **args): if not 'name' in args: name = path super().__init__(name, **args) self.path = path if lemma_tag: self.lemma_tag = lemma_tag else: self.lemma_tag = '' if is_lemmatized: self.is_lemmatized = True if lemma_tag != '': self.lemma_tag = lemma_tag else: self.lemma_tag = 'lemma' else: self.is_lemmatized = False self.lemma_tag = '' if pos_tag_tag: self.pos_tag_tag = pos_tag_tag else: self.pos_tag_tag = '' if is_pos_tagged: self.is_pos_tagged = True if pos_tag_tag != '': self.pos_tag_tag = pos_tag_tag else: self.pos_tag_tag = 'pos' else: self.is_pos_tagged = False self.pos_tag_tag = '' self.sentence_tag = sentence_tag self.token_tag = token_tag self.text_tag = text_tag
[docs] def get_attribute(self, tag, attribute): return tag.attrib[attribute]
[docs] def line_iterator(self): if os.path.isdir(self.path): fnames = self.folder_iterator(self.path) else: fnames = [self.path] def get_data(tokens, lemmas = [], pos_tags = []): data = {} data['raw_text'] = ' '.join(tokens) if self.is_lemmatized and lemmas != []: data['lemmas'] = lemmas if self.is_pos_tagged and pos_tags != []: data['pos_tags'] = pos_tags data['tokens'] = tokens return data def read_xml(source): tokens = [] lemmas = [] parser = ET.iterparse(source, events=('start','end')) sentence_counter = 0 for event, elem in parser: if elem.sourceline >= self.skip_lines: if elem.tag == self.text_tag: date = elem.get('date') if elem.tag == self.sentence_tag: if event == 'start': tokens = [] lemmas = [] pos_tags = [] # If the sentence has ended, create a new Line object with its content elif event == 'end': if tokens != []: data = get_data(tokens, lemmas, pos_tags) data['date'] = date line_id = elem.get('id', sentence_counter) data['id'] = line_id yield Line(fname=fname, **data) elem.clear() sentence_counter += 1 elif elem.tag == self.token_tag: if event == 'end': if self.is_lemmatized: lemma = self.get_attribute(elem, self.lemma_tag) lemmas.append(lemma) if self.is_pos_tagged: pos_tag = self.get_attribute(elem, self.pos_tag_tag) pos_tags.append(pos_tag) token = elem.text tokens.append(token) elem.clear() else: if event == 'end': elem.clear() for fname in fnames: if fname.endswith('.xml'): for l in read_xml(fname): yield l elif fname.endswith('.xml.bz2'): with bz2.open(fname, 'r') as f: for l in read_xml(f): yield l else: raise Exception('Format not recognized')
# Cast to a LineByLine corpus and save the result in the path specified in there
[docs] def cast_to_linebyline(self, linebyline_corpus : LinebyLineCorpus): savepath = linebyline_corpus.path if hasattr(linebyline_corpus, 'tokens_splitter'): tokens_splitter = linebyline_corpus.tokens_splitter else: tokens_splitter = ' ' tokenized = linebyline_corpus.is_tokenized lemmatized = linebyline_corpus.is_lemmatized if lemmatized and not self.is_lemmatized: logging.info('ERROR: cannot cast to lemmatized LinebyLineCorpus because this XMLCorpus is not lemmatized.') return None with open(savepath, 'w+') as f: if lemmatized: for line in self.line_iterator(): f.write(tokens_splitter.join(line.lemmas())+'\n') # cache needed here elif tokenized: for line in self.line_iterator(): f.write(tokens_splitter.join(line.tokens())+'\n') # cache needed here else: for line in self.line_iterator(): f.write(line.raw_text()+'\n') # cache needed here
[docs] def cast_to_vertical(self, vertical_corpus : VerticalCorpus): savepath = vertical_corpus.path field_separator = vertical_corpus.field_separator sentence_separator = vertical_corpus.sentence_separator # We need to make sure that the line features (token, lemma, pos, etc.) come in the same order as in the field_map in the vertical_corpus sorted_field_names = [key for (key, _) in sorted(vertical_corpus.field_map.items(), key = lambda x : x[1])] def get_line_feature(line, key): field_name_to_line_feature = {'token': line.tokens, 'lemma': line.lemmas, 'pos_tag': line.pos_tags} return field_name_to_line_feature[key]() with open(savepath,'w+') as f: for line in self.line_iterator(): for t in zip(*(get_line_feature(line, key) for key in sorted_field_names)): f.write(field_separator.join(list(t))+'\n') # cache needed here f.write(sentence_separator) # cache needed here
# A class for handling XML corpora specifically from spraakbanken.gu.se
[docs] class SprakBankenCorpus(XMLCorpus): def __init__(self, path, sentence_tag='sentence',token_tag='token', is_lemmatized=True, lemma_tag='lemma', is_pos_tagged=True, pos_tag_tag='pos', **args): super().__init__(path, sentence_tag, token_tag, is_lemmatized, lemma_tag, is_pos_tagged, pos_tag_tag, **args)
[docs] def get_attribute(self, tag, attribute): content = tag.attrib[attribute] if content != None: if attribute == self.lemma_tag: content = content.strip("|").split("|") if content != ['']: return content[0] else: return content return tag.text
[docs] class HistoricalCorpus(SortedKeyList): def __new__(cls, *args, **kwargs): """Ensures only valid arguments go to SortedKeyList""" return super().__new__(cls) def __init__(self, corpora:Union[List[Corpus],str], key=lambda c : c.time, corpus_type=None, time_function=None): """ This class is a SortedKeyList of corpora. A historical corpus can be initialised either from a path where the files are located, or from a list of already instanciated Corpus objects. Args: corpora ([Corpus]|str): a list of corpora or a path where the corpora are stored. key (function, default = lambda c : c.time): the key by which the corpora are sorted. Default sorting is by time, in ascending order corpus_type (str, default=None): the kind of corpus. Needs to be provided if initalising from a folder, and then needs to be one of 'line_by_line','vertical','xml', and 'sprakbanken'. time_function (function, default = None): the function used to extract a time value for each corpus. Needed if initialising from a folder. """ if isinstance(corpora, str): try: if corpus_type not in ['line_by_line','vertical','xml','sprakbanken']: logging.error("When initialising from a folder path, corpus_type must be one of 'line_by_line','vertical','xml' and 'sprakbanken'.") raise ValueError corpora_list = [] for file in os.listdir(corpora): try: if corpus_type == 'line_by_line': corpus = LinebyLineCorpus(os.path.join(corpora,file),time_function=time_function) elif corpus_type == 'vertical': corpus = VerticalCorpus(os.path.join(corpora,file),time_function=time_function) elif corpus_type == 'xml': corpus = XMLCorpus(os.path.join(corpora,file),time_function=time_function) elif corpus_type == 'sprakbanken': corpus = SprakBankenCorpus(os.path.join(corpora,file),time_function=time_function) corpora_list.append(corpus) except: #TODO: proper exception logging.error(f"Could not initialise a corpus from path {os.path.join(dir,file)}.") continue corpora = corpora_list except: logging.error(f"Could not use path {corpora} to intitialize corpora.") raise Exception elif isinstance(corpora, list): for corpus in corpora: if not isinstance(corpus, Corpus): logging.error("Every element in 'corpora' needs to be a Corpus object.") raise Exception else: logging.error("'corpora' needs to be either a string or a list of Corpus objects.") raise Exception super().__init__(corpora, key)
[docs] def line_iterator(self): """ Iterates through all of the corpora, and yields all of the lines that are possible to get. """ for corpus in self: try: for line in corpus.line_iterator(): yield line except: logging.error(f"Could not get lines from {corpus.name}.")
[docs] def search(self, search_terms : List[ str | Pattern | SearchTerm ], index_by_corpus=False): """ Searches through all of the corpora by calling search() for each of them. Args: search_terms : List[ str | Pattern | SearchTerm ] If search term is str or Pattern it is converted to a SearchTerm and matches tokens only SearchTerm(word_feature = 'token'). index_by_corpus : bool, default False decides whether the usages for a given word should be a dictionary, with keys as the corpus names and values as lists of usages, or a list of all usages across corpora. Returns: a dictionary containing all search results from the included corpora. """ if index_by_corpus: usages = {} #TODO: make this saveable for corpus in self: try: usage_dict : UsageDictionary = corpus.search(search_terms) except: logging.error(f"Could not search through {corpus.name}.") continue for key in usage_dict: if not key in usages: usages[key] = {corpus.name : TargetUsageList()} usages[key][corpus.name] = usage_dict[key] else: usages = UsageDictionary() for corpus in self: try: usage_dict : UsageDictionary = corpus.search(search_terms) except: logging.error(f"Could not search through {corpus.name}.") continue for key in usage_dict: if not key in usages: usages[key] = TargetUsageList() usages[key].extend(usage_dict[key]) return usages