# Natural Language Toolkit: API for Corpus Readers # # Copyright (C) 2001-2022 NLTK Project # Author: Steven Bird # Edward Loper # URL: # For license information, see LICENSE.TXT """ API for corpus readers. """ import os import re from collections import defaultdict from itertools import chain from nltk.corpus.reader.util import * from nltk.data import FileSystemPathPointer, PathPointer, ZipFilePathPointer class CorpusReader: """ A base class for "corpus reader" classes, each of which can be used to read a specific corpus format. Each individual corpus reader instance is used to read a specific corpus, consisting of one or more files under a common root directory. Each file is identified by its ``file identifier``, which is the relative path to the file from the root directory. A separate subclass is defined for each corpus format. These subclasses define one or more methods that provide 'views' on the corpus contents, such as ``words()`` (for a list of words) and ``parsed_sents()`` (for a list of parsed sentences). Called with no arguments, these methods will return the contents of the entire corpus. For most corpora, these methods define one or more selection arguments, such as ``fileids`` or ``categories``, which can be used to select which portion of the corpus should be returned. """ def __init__(self, root, fileids, encoding="utf8", tagset=None): """ :type root: PathPointer or str :param root: A path pointer identifying the root directory for this corpus. If a string is specified, then it will be converted to a ``PathPointer`` automatically. :param fileids: A list of the files that make up this corpus. This list can either be specified explicitly, as a list of strings; or implicitly, as a regular expression over file paths. The absolute path for each file will be constructed by joining the reader's root to each file name. :param encoding: The default unicode encoding for the files that make up the corpus. The value of ``encoding`` can be any of the following: - A string: ``encoding`` is the encoding name for all files. - A dictionary: ``encoding[file_id]`` is the encoding name for the file whose identifier is ``file_id``. If ``file_id`` is not in ``encoding``, then the file contents will be processed using non-unicode byte strings. - A list: ``encoding`` should be a list of ``(regexp, encoding)`` tuples. The encoding for a file whose identifier is ``file_id`` will be the ``encoding`` value for the first tuple whose ``regexp`` matches the ``file_id``. If no tuple's ``regexp`` matches the ``file_id``, the file contents will be processed using non-unicode byte strings. - None: the file contents of all files will be processed using non-unicode byte strings. :param tagset: The name of the tagset used by this corpus, to be used for normalizing or converting the POS tags returned by the ``tagged_...()`` methods. """ # Convert the root to a path pointer, if necessary. if isinstance(root, str) and not isinstance(root, PathPointer): m = re.match(r"(.*\.zip)/?(.*)$|", root) zipfile, zipentry = m.groups() if zipfile: root = ZipFilePathPointer(zipfile, zipentry) else: root = FileSystemPathPointer(root) elif not isinstance(root, PathPointer): raise TypeError("CorpusReader: expected a string or a PathPointer") # If `fileids` is a regexp, then expand it. if isinstance(fileids, str): fileids = find_corpus_fileids(root, fileids) self._fileids = fileids """A list of the relative paths for the fileids that make up this corpus.""" self._root = root """The root directory for this corpus.""" self._readme = "README" self._license = "LICENSE" self._citation = "citation.bib" # If encoding was specified as a list of regexps, then convert # it to a dictionary. if isinstance(encoding, list): encoding_dict = {} for fileid in self._fileids: for x in encoding: (regexp, enc) = x if re.match(regexp, fileid): encoding_dict[fileid] = enc break encoding = encoding_dict self._encoding = encoding """The default unicode encoding for the fileids that make up this corpus. If ``encoding`` is None, then the file contents are processed using byte strings.""" self._tagset = tagset def __repr__(self): if isinstance(self._root, ZipFilePathPointer): path = f"{self._root.zipfile.filename}/{self._root.entry}" else: path = "%s" % self._root.path return f"<{self.__class__.__name__} in {path!r}>" def ensure_loaded(self): """ Load this corpus (if it has not already been loaded). This is used by LazyCorpusLoader as a simple method that can be used to make sure a corpus is loaded -- e.g., in case a user wants to do help(some_corpus). """ pass # no need to actually do anything. def readme(self): """ Return the contents of the corpus README file, if it exists. """ with self.open(self._readme) as f: return f.read() def license(self): """ Return the contents of the corpus LICENSE file, if it exists. """ with self.open(self._license) as f: return f.read() def citation(self): """ Return the contents of the corpus citation.bib file, if it exists. """ with self.open(self._citation) as f: return f.read() def fileids(self): """ Return a list of file identifiers for the fileids that make up this corpus. """ return self._fileids def abspath(self, fileid): """ Return the absolute path for the given file. :type fileid: str :param fileid: The file identifier for the file whose path should be returned. :rtype: PathPointer """ return self._root.join(fileid) def abspaths(self, fileids=None, include_encoding=False, include_fileid=False): """ Return a list of the absolute paths for all fileids in this corpus; or for the given list of fileids, if specified. :type fileids: None or str or list :param fileids: Specifies the set of fileids for which paths should be returned. Can be None, for all fileids; a list of file identifiers, for a specified set of fileids; or a single file identifier, for a single file. Note that the return value is always a list of paths, even if ``fileids`` is a single file identifier. :param include_encoding: If true, then return a list of ``(path_pointer, encoding)`` tuples. :rtype: list(PathPointer) """ if fileids is None: fileids = self._fileids elif isinstance(fileids, str): fileids = [fileids] paths = [self._root.join(f) for f in fileids] if include_encoding and include_fileid: return list(zip(paths, [self.encoding(f) for f in fileids], fileids)) elif include_fileid: return list(zip(paths, fileids)) elif include_encoding: return list(zip(paths, [self.encoding(f) for f in fileids])) else: return paths def raw(self, fileids=None): """ :param fileids: A list specifying the fileids that should be used. :return: the given file(s) as a single string. :rtype: str """ if fileids is None: fileids = self._fileids elif isinstance(fileids, str): fileids = [fileids] contents = [] for f in fileids: with self.open(f) as fp: contents.append(fp.read()) return concat(contents) def open(self, file): """ Return an open stream that can be used to read the given file. If the file's encoding is not None, then the stream will automatically decode the file's contents into unicode. :param file: The file identifier of the file to read. """ encoding = self.encoding(file) stream = self._root.join(file).open(encoding) return stream def encoding(self, file): """ Return the unicode encoding for the given corpus file, if known. If the encoding is unknown, or if the given file should be processed using byte strings (str), then return None. """ if isinstance(self._encoding, dict): return self._encoding.get(file) else: return self._encoding def _get_root(self): return self._root root = property( _get_root, doc=""" The directory where this corpus is stored. :type: PathPointer""", ) ###################################################################### # { Corpora containing categorized items ###################################################################### class CategorizedCorpusReader: """ A mixin class used to aid in the implementation of corpus readers for categorized corpora. This class defines the method ``categories()``, which returns a list of the categories for the corpus or for a specified set of fileids; and overrides ``fileids()`` to take a ``categories`` argument, restricting the set of fileids to be returned. Subclasses are expected to: - Call ``__init__()`` to set up the mapping. - Override all view methods to accept a ``categories`` parameter, which can be used *instead* of the ``fileids`` parameter, to select which fileids should be included in the returned view. """ def __init__(self, kwargs): """ Initialize this mapping based on keyword arguments, as follows: - cat_pattern: A regular expression pattern used to find the category for each file identifier. The pattern will be applied to each file identifier, and the first matching group will be used as the category label for that file. - cat_map: A dictionary, mapping from file identifiers to category labels. - cat_file: The name of a file that contains the mapping from file identifiers to categories. The argument ``cat_delimiter`` can be used to specify a delimiter. The corresponding argument will be deleted from ``kwargs``. If more than one argument is specified, an exception will be raised. """ self._f2c = None #: file-to-category mapping self._c2f = None #: category-to-file mapping self._pattern = None #: regexp specifying the mapping self._map = None #: dict specifying the mapping self._file = None #: fileid of file containing the mapping self._delimiter = None #: delimiter for ``self._file`` if "cat_pattern" in kwargs: self._pattern = kwargs["cat_pattern"] del kwargs["cat_pattern"] elif "cat_map" in kwargs: self._map = kwargs["cat_map"] del kwargs["cat_map"] elif "cat_file" in kwargs: self._file = kwargs["cat_file"] del kwargs["cat_file"] if "cat_delimiter" in kwargs: self._delimiter = kwargs["cat_delimiter"] del kwargs["cat_delimiter"] else: raise ValueError( "Expected keyword argument cat_pattern or " "cat_map or cat_file." ) if "cat_pattern" in kwargs or "cat_map" in kwargs or "cat_file" in kwargs: raise ValueError( "Specify exactly one of: cat_pattern, " "cat_map, cat_file." ) def _init(self): self._f2c = defaultdict(set) self._c2f = defaultdict(set) if self._pattern is not None: for file_id in self._fileids: category = re.match(self._pattern, file_id).group(1) self._add(file_id, category) elif self._map is not None: for (file_id, categories) in self._map.items(): for category in categories: self._add(file_id, category) elif self._file is not None: with self.open(self._file) as f: for line in f.readlines(): line = line.strip() file_id, categories = line.split(self._delimiter, 1) if file_id not in self.fileids(): raise ValueError( "In category mapping file %s: %s " "not found" % (self._file, file_id) ) for category in categories.split(self._delimiter): self._add(file_id, category) def _add(self, file_id, category): self._f2c[file_id].add(category) self._c2f[category].add(file_id) def categories(self, fileids=None): """ Return a list of the categories that are defined for this corpus, or for the file(s) if it is given. """ if self._f2c is None: self._init() if fileids is None: return sorted(self._c2f) if isinstance(fileids, str): fileids = [fileids] return sorted(set.union(*(self._f2c[d] for d in fileids))) def fileids(self, categories=None): """ Return a list of file identifiers for the files that make up this corpus, or that make up the given category(s) if specified. """ if categories is None: return super().fileids() elif isinstance(categories, str): if self._f2c is None: self._init() if categories in self._c2f: return sorted(self._c2f[categories]) else: raise ValueError("Category %s not found" % categories) else: if self._f2c is None: self._init() return sorted(set.union(*(self._c2f[c] for c in categories))) def _resolve(self, fileids, categories): if fileids is not None and categories is not None: raise ValueError("Specify fileids or categories, not both") if categories is not None: return self.fileids(categories) else: return fileids def raw(self, fileids=None, categories=None): return super().raw(self._resolve(fileids, categories)) def words(self, fileids=None, categories=None): return super().words(self._resolve(fileids, categories)) def sents(self, fileids=None, categories=None): return super().sents(self._resolve(fileids, categories)) def paras(self, fileids=None, categories=None): return super().paras(self._resolve(fileids, categories)) ###################################################################### # { Treebank readers ###################################################################### # [xx] is it worth it to factor this out? class SyntaxCorpusReader(CorpusReader): """ An abstract base class for reading corpora consisting of syntactically parsed text. Subclasses should define: - ``__init__``, which specifies the location of the corpus and a method for detecting the sentence blocks in corpus files. - ``_read_block``, which reads a block from the input stream. - ``_word``, which takes a block and returns a list of list of words. - ``_tag``, which takes a block and returns a list of list of tagged words. - ``_parse``, which takes a block and returns a list of parsed sentences. """ def _parse(self, s): raise NotImplementedError() def _word(self, s): raise NotImplementedError() def _tag(self, s): raise NotImplementedError() def _read_block(self, stream): raise NotImplementedError() def parsed_sents(self, fileids=None): reader = self._read_parsed_sent_block return concat( [ StreamBackedCorpusView(fileid, reader, encoding=enc) for fileid, enc in self.abspaths(fileids, True) ] ) def tagged_sents(self, fileids=None, tagset=None): def reader(stream): return self._read_tagged_sent_block(stream, tagset) return concat( [ StreamBackedCorpusView(fileid, reader, encoding=enc) for fileid, enc in self.abspaths(fileids, True) ] ) def sents(self, fileids=None): reader = self._read_sent_block return concat( [ StreamBackedCorpusView(fileid, reader, encoding=enc) for fileid, enc in self.abspaths(fileids, True) ] ) def tagged_words(self, fileids=None, tagset=None): def reader(stream): return self._read_tagged_word_block(stream, tagset) return concat( [ StreamBackedCorpusView(fileid, reader, encoding=enc) for fileid, enc in self.abspaths(fileids, True) ] ) def words(self, fileids=None): return concat( [ StreamBackedCorpusView(fileid, self._read_word_block, encoding=enc) for fileid, enc in self.abspaths(fileids, True) ] ) # ------------------------------------------------------------ # { Block Readers def _read_word_block(self, stream): return list(chain.from_iterable(self._read_sent_block(stream))) def _read_tagged_word_block(self, stream, tagset=None): return list(chain.from_iterable(self._read_tagged_sent_block(stream, tagset))) def _read_sent_block(self, stream): return list(filter(None, [self._word(t) for t in self._read_block(stream)])) def _read_tagged_sent_block(self, stream, tagset=None): return list( filter(None, [self._tag(t, tagset) for t in self._read_block(stream)]) ) def _read_parsed_sent_block(self, stream): return list(filter(None, [self._parse(t) for t in self._read_block(stream)])) # } End of Block Readers # ------------------------------------------------------------