Source code for whoosh.reading

# Copyright 2007 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

"""This module contains classes that allow reading from an index.
"""

from abc import abstractmethod
from bisect import bisect_right
from heapq import heapify, heappop, heapreplace, nlargest
from math import log

from cached_property import cached_property

from whoosh import columns
from whoosh.filedb.filestore import OverlayStorage
from whoosh.matching import MultiMatcher
from whoosh.support.levenshtein import distance
from whoosh.system import emptybytes

# Exceptions


class ReaderClosed(Exception):
    """Exception raised when you try to do some operation on a closed searcher
    (or a Results object derived from a searcher that has since been closed).
    """

    message = "Operation on a closed reader"


[docs]class TermNotFound(Exception): pass
# Term Info base class
[docs]class TermInfo: """Represents a set of statistics about a term. This object is returned by :meth:`IndexReader.term_info`. These statistics may be useful for optimizations and scoring algorithms. """ def __init__( self, weight=0, df=0, minlength=None, maxlength=0, maxweight=0, minid=None, maxid=0, ): self._weight = weight self._df = df self._minlength = minlength self._maxlength = maxlength self._maxweight = maxweight self._minid = minid self._maxid = maxid def add_posting(self, docnum, weight, length=None): if self._minid is None: self._minid = docnum self._maxid = docnum self._weight += weight self._df += 1 self._maxweight = max(self._maxweight, weight) if length is not None: if self._minlength is None: self._minlength = length else: self._minlength = min(self._minlength, length) self._maxlength = max(self._maxlength, length)
[docs] def weight(self): """Returns the total frequency of the term across all documents.""" return self._weight
[docs] def doc_frequency(self): """Returns the number of documents the term appears in.""" return self._df
[docs] def min_length(self): """Returns the length of the shortest field value the term appears in. """ return self._minlength
[docs] def max_length(self): """Returns the length of the longest field value the term appears in. """ return self._maxlength
[docs] def max_weight(self): """Returns the number of times the term appears in the document in which it appears the most. """ return self._maxweight
[docs] def min_id(self): """Returns the lowest document ID this term appears in.""" return self._minid
[docs] def max_id(self): """Returns the highest document ID this term appears in.""" return self._maxid
# Reader base class
[docs]class IndexReader: """Do not instantiate this object directly. Instead use Index.reader().""" def __enter__(self): return self def __exit__(self, *args): self.close() @abstractmethod def __contains__(self, term): """Returns True if the given term tuple (fieldname, text) is in this reader. """ raise NotImplementedError
[docs] def codec(self): """Returns the :class:`whoosh.codec.base.Codec` object used to read this reader's segment. If this reader is not atomic (``reader.is_atomic() == True``), returns None. """ return None
[docs] def segment(self): """Returns the :class:`whoosh.index.Segment` object used by this reader. If this reader is not atomic (``reader.is_atomic() == True``), returns None. """ return None
[docs] def segments(self): """Returns a list of :class:`whoosh.index.Segment` objects used by this reader.""" return None
[docs] def storage(self): """Returns the :class:`whoosh.filedb.filestore.Storage` object used by this reader to read its files. If the reader is not atomic, (``reader.is_atomic() == True``), returns None. """ return None
def is_atomic(self): return True def _text_to_bytes(self, fieldname, text): if fieldname not in self.schema: raise TermNotFound((fieldname, text)) return self.schema[fieldname].to_bytes(text)
[docs] def close(self): """Closes the open files associated with this reader.""" pass
[docs] def generation(self): """Returns the generation of the index being read, or -1 if the backend is not versioned. """ return None
[docs] @abstractmethod def indexed_field_names(self): """Returns an iterable of strings representing the names of the indexed fields. This may include additional names not explicitly listed in the Schema if you use "glob" fields. """ raise NotImplementedError
[docs] @abstractmethod def all_terms(self): """Yields (fieldname, text) tuples for every term in the index.""" raise NotImplementedError
[docs] def terms_from(self, fieldname, prefix): """Yields (fieldname, text) tuples for every term in the index starting at the given prefix. """ # The default implementation just scans the whole list of terms for fname, text in self.all_terms(): if fname < fieldname or text < prefix: continue yield (fname, text)
[docs] @abstractmethod def term_info(self, fieldname, text): """Returns a :class:`TermInfo` object allowing access to various statistics about the given term. """ raise NotImplementedError
[docs] def expand_prefix(self, fieldname, prefix): """Yields terms in the given field that start with the given prefix.""" prefix = self._text_to_bytes(fieldname, prefix) for fn, text in self.terms_from(fieldname, prefix): if fn != fieldname or not text.startswith(prefix): return yield text
[docs] def lexicon(self, fieldname): """Yields all bytestrings in the given field.""" for fn, btext in self.terms_from(fieldname, emptybytes): if fn != fieldname: return yield btext
[docs] def field_terms(self, fieldname): """Yields all term values (converted from on-disk bytes) in the given field. """ from_bytes = self.schema[fieldname].from_bytes for btext in self.lexicon(fieldname): yield from_bytes(btext)
def __iter__(self): """Yields ((fieldname, text), terminfo) tuples for each term in the reader, in lexical order. """ term_info = self.term_info for term in self.all_terms(): yield (term, term_info(*term))
[docs] def iter_from(self, fieldname, text): """Yields ((fieldname, text), terminfo) tuples for all terms in the reader, starting at the given term. """ term_info = self.term_info text = self._text_to_bytes(fieldname, text) for term in self.terms_from(fieldname, text): yield (term, term_info(*term))
[docs] def iter_field(self, fieldname, prefix=""): """Yields (text, terminfo) tuples for all terms in the given field.""" prefix = self._text_to_bytes(fieldname, prefix) for (fn, text), terminfo in self.iter_from(fieldname, prefix): if fn != fieldname: return yield text, terminfo
[docs] def iter_prefix(self, fieldname, prefix): """Yields (text, terminfo) tuples for all terms in the given field with a certain prefix. """ prefix = self._text_to_bytes(fieldname, prefix) for text, terminfo in self.iter_field(fieldname, prefix): if not text.startswith(prefix): return yield (text, terminfo)
[docs] @abstractmethod def has_deletions(self): """Returns True if the underlying index/segment has deleted documents. """ raise NotImplementedError
[docs] def all_doc_ids(self): """Returns an iterator of all (undeleted) document IDs in the reader.""" is_deleted = self.is_deleted return ( docnum for docnum in range(self.doc_count_all()) if not is_deleted(docnum) )
[docs] def iter_docs(self): """Yields a series of ``(docnum, stored_fields_dict)`` tuples for the undeleted documents in the reader. """ for docnum in self.all_doc_ids(): yield docnum, self.stored_fields(docnum)
[docs] @abstractmethod def is_deleted(self, docnum): """Returns True if the given document number is marked deleted.""" raise NotImplementedError
[docs] @abstractmethod def stored_fields(self, docnum): """Returns the stored fields for the given document number. :param numerickeys: use field numbers as the dictionary keys instead of field names. """ raise NotImplementedError
[docs] def all_stored_fields(self): """Yields the stored fields for all non-deleted documents.""" is_deleted = self.is_deleted for docnum in range(self.doc_count_all()): if not is_deleted(docnum): yield self.stored_fields(docnum)
[docs] @abstractmethod def doc_count_all(self): """Returns the total number of documents, DELETED OR UNDELETED, in this reader. """ raise NotImplementedError
[docs] @abstractmethod def doc_count(self): """Returns the total number of UNDELETED documents in this reader.""" return self.doc_count_all() - self.deleted_count()
[docs] @abstractmethod def frequency(self, fieldname, text): """Returns the total number of instances of the given term in the collection. """ raise NotImplementedError
[docs] @abstractmethod def doc_frequency(self, fieldname, text): """Returns how many documents the given term appears in.""" raise NotImplementedError
[docs] @abstractmethod def field_length(self, fieldname): """Returns the total number of terms in the given field. This is used by some scoring algorithms. """ raise NotImplementedError
[docs] @abstractmethod def min_field_length(self, fieldname): """Returns the minimum length of the field across all documents. This is used by some scoring algorithms. """ raise NotImplementedError
[docs] @abstractmethod def max_field_length(self, fieldname): """Returns the minimum length of the field across all documents. This is used by some scoring algorithms. """ raise NotImplementedError
[docs] @abstractmethod def doc_field_length(self, docnum, fieldname, default=0): """Returns the number of terms in the given field in the given document. This is used by some scoring algorithms. """ raise NotImplementedError
[docs] def first_id(self, fieldname, text): """Returns the first ID in the posting list for the given term. This may be optimized in certain backends. """ text = self._text_to_bytes(fieldname, text) p = self.postings(fieldname, text) if p.is_active(): return p.id() raise TermNotFound((fieldname, text))
[docs] def iter_postings(self): """Low-level method, yields all postings in the reader as ``(fieldname, text, docnum, weight, valuestring)`` tuples. """ for fieldname, btext in self.all_terms(): m = self.postings(fieldname, btext) while m.is_active(): yield (fieldname, btext, m.id(), m.weight(), m.value()) m.next()
[docs] @abstractmethod def postings(self, fieldname, text): """Returns a :class:`~whoosh.matching.Matcher` for the postings of the given term. >>> pr = reader.postings("content", "render") >>> pr.skip_to(10) >>> pr.id 12 :param fieldname: the field name or field number of the term. :param text: the text of the term. :rtype: :class:`whoosh.matching.Matcher` """ raise NotImplementedError
[docs] @abstractmethod def has_vector(self, docnum, fieldname): """Returns True if the given document has a term vector for the given field. """ raise NotImplementedError
[docs] @abstractmethod def vector(self, docnum, fieldname, format_=None): """Returns a :class:`~whoosh.matching.Matcher` object for the given term vector. >>> docnum = searcher.document_number(path=u'/a/b/c') >>> v = searcher.vector(docnum, "content") >>> v.all_as("frequency") [(u"apple", 3), (u"bear", 2), (u"cab", 2)] :param docnum: the document number of the document for which you want the term vector. :param fieldname: the field name or field number of the field for which you want the term vector. :rtype: :class:`whoosh.matching.Matcher` """ raise NotImplementedError
[docs] def vector_as(self, astype, docnum, fieldname): """Returns an iterator of (termtext, value) pairs for the terms in the given term vector. This is a convenient shortcut to calling vector() and using the Matcher object when all you want are the terms and/or values. >>> docnum = searcher.document_number(path=u'/a/b/c') >>> searcher.vector_as("frequency", docnum, "content") [(u"apple", 3), (u"bear", 2), (u"cab", 2)] :param docnum: the document number of the document for which you want the term vector. :param fieldname: the field name or field number of the field for which you want the term vector. :param astype: a string containing the name of the format you want the term vector's data in, for example "weights". """ vec = self.vector(docnum, fieldname) if astype == "weight": while vec.is_active(): yield (vec.id(), vec.weight()) vec.next() else: format_ = self.schema[fieldname].format decoder = format_.decoder(astype) while vec.is_active(): yield (vec.id(), decoder(vec.value())) vec.next()
[docs] def corrector(self, fieldname): """Returns a :class:`whoosh.spelling.Corrector` object that suggests corrections based on the terms in the given field. """ from whoosh.spelling import ReaderCorrector fieldobj = self.schema[fieldname] return ReaderCorrector(self, fieldname, fieldobj)
[docs] def terms_within(self, fieldname, text, maxdist, prefix=0): """ Returns a generator of words in the given field within ``maxdist`` Damerau-Levenshtein edit distance of the given text. Important: the terms are returned in **no particular order**. The only criterion is that they are within ``maxdist`` edits of ``text``. You may want to run this method multiple times with increasing ``maxdist`` values to ensure you get the closest matches first. You may also have additional information (such as term frequency or an acoustic matching algorithm) you can use to rank terms with the same edit distance. :param maxdist: the maximum edit distance. :param prefix: require suggestions to share a prefix of this length with the given word. This is often justifiable since most misspellings do not involve the first letter of the word. Using a prefix dramatically decreases the time it takes to generate the list of words. :param seen: an optional set object. Words that appear in the set will not be yielded. """ fieldobj = self.schema[fieldname] for btext in self.expand_prefix(fieldname, text[:prefix]): word = fieldobj.from_bytes(btext) k = distance(word, text, limit=maxdist) if k <= maxdist: yield word
[docs] def most_frequent_terms(self, fieldname, number=5, prefix=""): """Returns the top 'number' most frequent terms in the given field as a list of (frequency, text) tuples. """ gen = ( (terminfo.weight(), text) for text, terminfo in self.iter_prefix(fieldname, prefix) ) return nlargest(number, gen)
[docs] def most_distinctive_terms(self, fieldname, number=5, prefix=""): """Returns the top 'number' terms with the highest `tf*idf` scores as a list of (score, text) tuples. """ N = float(self.doc_count()) gen = ( (terminfo.weight() * log(N / terminfo.doc_frequency()), text) for text, terminfo in self.iter_prefix(fieldname, prefix) ) return nlargest(number, gen)
[docs] def leaf_readers(self): """Returns a list of (IndexReader, docbase) pairs for the child readers of this reader if it is a composite reader. If this is not a composite reader, it returns `[(self, 0)]`. """ return [(self, 0)]
def supports_caches(self): return False def has_column(self, fieldname): return False
[docs] def column_reader(self, fieldname, column=None, reverse=False, translate=False): """ :param fieldname: the name of the field for which to get a reader. :param column: if passed, use this Column object instead of the one associated with the field in the Schema. :param reverse: if passed, reverses the order of keys returned by the reader's ``sort_key()`` method. If the column type is not reversible, this will raise a ``NotImplementedError``. :param translate: if True, wrap the reader to call the field's ``from_bytes()`` method on the returned values. :return: a :class:`whoosh.columns.ColumnReader` object. """ raise NotImplementedError
# Segment-based reader class SegmentReader(IndexReader): def __init__(self, storage, schema, segment, generation=None, codec=None): self.schema = schema self.is_closed = False self._segment = segment self._segid = self._segment.segment_id() self._gen = generation # self.files is a storage object from which to load the segment files. # This is different from the general storage (which will be used for # caches) if the segment is in a compound file. if segment.is_compound(): # Open the compound file as a storage object files = segment.open_compound_file(storage) # Use an overlay here instead of just the compound storage, in rare # circumstances a segment file may be added after the segment is # written self._storage = OverlayStorage(files, storage) else: self._storage = storage # Get subreaders from codec self._codec = codec if codec else segment.codec() self._terms = self._codec.terms_reader(self._storage, segment) self._perdoc = self._codec.per_document_reader(self._storage, segment) def codec(self): return self._codec def segment(self): return self._segment def segments(self): return [self.segment()] def storage(self): return self._storage def has_deletions(self): if self.is_closed: raise ReaderClosed return self._perdoc.has_deletions() def doc_count(self): if self.is_closed: raise ReaderClosed return self._perdoc.doc_count() def doc_count_all(self): if self.is_closed: raise ReaderClosed return self._perdoc.doc_count_all() def is_deleted(self, docnum): if self.is_closed: raise ReaderClosed return self._perdoc.is_deleted(docnum) def generation(self): return self._gen def __repr__(self): return f"{self.__class__.__name__}({self._storage!r}, {self._segment!r})" def __contains__(self, term): if self.is_closed: raise ReaderClosed fieldname, text = term if fieldname not in self.schema: return False text = self._text_to_bytes(fieldname, text) return (fieldname, text) in self._terms def close(self): if self.is_closed: raise ReaderClosed("Reader already closed") self._terms.close() self._perdoc.close() # It's possible some weird codec that doesn't use storage might have # passed None instead of a storage object if self._storage: self._storage.close() self.is_closed = True def stored_fields(self, docnum): if self.is_closed: raise ReaderClosed assert docnum >= 0 schema = self.schema sfs = self._perdoc.stored_fields(docnum) # Double-check with schema to filter out removed fields return dict(item for item in sfs.items() if item[0] in schema) # Delegate doc methods to the per-doc reader def all_doc_ids(self): if self.is_closed: raise ReaderClosed return self._perdoc.all_doc_ids() def iter_docs(self): if self.is_closed: raise ReaderClosed return self._perdoc.iter_docs() def all_stored_fields(self): if self.is_closed: raise ReaderClosed return self._perdoc.all_stored_fields() def field_length(self, fieldname): if self.is_closed: raise ReaderClosed return self._perdoc.field_length(fieldname) def min_field_length(self, fieldname): if self.is_closed: raise ReaderClosed return self._perdoc.min_field_length(fieldname) def max_field_length(self, fieldname): if self.is_closed: raise ReaderClosed return self._perdoc.max_field_length(fieldname) def doc_field_length(self, docnum, fieldname, default=0): if self.is_closed: raise ReaderClosed return self._perdoc.doc_field_length(docnum, fieldname, default) def has_vector(self, docnum, fieldname): if self.is_closed: raise ReaderClosed return self._perdoc.has_vector(docnum, fieldname) # def _test_field(self, fieldname): if self.is_closed: raise ReaderClosed if fieldname not in self.schema: raise TermNotFound(f"No field {fieldname!r}") if self.schema[fieldname].format is None: raise TermNotFound(f"Field {fieldname!r} is not indexed") def indexed_field_names(self): return self._terms.indexed_field_names() def all_terms(self): if self.is_closed: raise ReaderClosed schema = self.schema return ( (fieldname, text) for fieldname, text in self._terms.terms() if fieldname in schema ) def terms_from(self, fieldname, prefix): self._test_field(fieldname) prefix = self._text_to_bytes(fieldname, prefix) schema = self.schema return ( (fname, text) for fname, text in self._terms.terms_from(fieldname, prefix) if fname in schema ) def term_info(self, fieldname, text): self._test_field(fieldname) text = self._text_to_bytes(fieldname, text) try: return self._terms.term_info(fieldname, text) except KeyError: raise TermNotFound(f"{fieldname}:{text!r}") def expand_prefix(self, fieldname, prefix): self._test_field(fieldname) prefix = self._text_to_bytes(fieldname, prefix) return IndexReader.expand_prefix(self, fieldname, prefix) def lexicon(self, fieldname): self._test_field(fieldname) return IndexReader.lexicon(self, fieldname) def __iter__(self): if self.is_closed: raise ReaderClosed schema = self.schema return ( (term, terminfo) for term, terminfo in self._terms.items() if term[0] in schema ) def iter_from(self, fieldname, text): self._test_field(fieldname) schema = self.schema text = self._text_to_bytes(fieldname, text) for term, terminfo in self._terms.items_from(fieldname, text): if term[0] not in schema: continue yield (term, terminfo) def frequency(self, fieldname, text): self._test_field(fieldname) text = self._text_to_bytes(fieldname, text) try: return self._terms.frequency(fieldname, text) except KeyError: return 0 def doc_frequency(self, fieldname, text): self._test_field(fieldname) text = self._text_to_bytes(fieldname, text) try: return self._terms.doc_frequency(fieldname, text) except KeyError: return 0 @cached_property def deleted_docs_set(self): return frozenset(self._perdoc.deleted_docs()) def postings(self, fieldname, text, scorer=None): from whoosh.matching.wrappers import FilterMatcher if self.is_closed: raise ReaderClosed if fieldname not in self.schema: raise TermNotFound(f"No field {fieldname!r}") text = self._text_to_bytes(fieldname, text) format_ = self.schema[fieldname].format matcher = self._terms.matcher(fieldname, text, format_, scorer=scorer) deleted = self.deleted_docs_set if deleted: matcher = FilterMatcher(matcher, deleted, exclude=True) return matcher def vector(self, docnum, fieldname, format_=None): if self.is_closed: raise ReaderClosed if fieldname not in self.schema: raise TermNotFound(f"No field {fieldname!r}") vformat = format_ or self.schema[fieldname].vector if not vformat: raise Exception(f"No vectors are stored for field {fieldname!r}") return self._perdoc.vector(docnum, fieldname, vformat) def cursor(self, fieldname): if self.is_closed: raise ReaderClosed fieldobj = self.schema[fieldname] return self._terms.cursor(fieldname, fieldobj) def terms_within(self, fieldname, text, maxdist, prefix=0): # Replaces the horribly inefficient base implementation with one based # on skipping through the word list efficiently using a DFA fieldobj = self.schema[fieldname] spellfield = fieldobj.spelling_fieldname(fieldname) auto = self._codec.automata(self._storage, self._segment) fieldcur = self.cursor(spellfield) return auto.terms_within(fieldcur, text, maxdist, prefix) # Column methods def has_column(self, fieldname): if self.is_closed: raise ReaderClosed coltype = self.schema[fieldname].column_type return coltype and self._perdoc.has_column(fieldname) def column_reader(self, fieldname, column=None, reverse=False, translate=True): if self.is_closed: raise ReaderClosed fieldobj = self.schema[fieldname] column = column or fieldobj.column_type if not column: raise Exception(f"No column for field {fieldname!r} in {self!r}") if self._perdoc.has_column(fieldname): creader = self._perdoc.column_reader(fieldname, column) if reverse: creader.set_reverse() else: # This segment doesn't have a column file for this field, so create # a fake column reader that always returns the default value. default = column.default_value(reverse) creader = columns.EmptyColumnReader(default, self.doc_count_all()) if translate: # Wrap the column in a Translator to give the caller # nice values instead of sortable representations fcv = fieldobj.from_column_value creader = columns.TranslatingColumnReader(creader, fcv) return creader # Fake IndexReader class for empty indexes class EmptyReader(IndexReader): def __init__(self, schema): self.schema = schema def __contains__(self, term): return False def __iter__(self): return iter([]) def segments(self): return None def cursor(self, fieldname): from whoosh.codec.base import EmptyCursor return EmptyCursor() def indexed_field_names(self): return [] def all_terms(self): return iter([]) def term_info(self, fieldname, text): raise TermNotFound((fieldname, text)) def iter_from(self, fieldname, text): return iter([]) def iter_field(self, fieldname, prefix=""): return iter([]) def iter_prefix(self, fieldname, prefix=""): return iter([]) def lexicon(self, fieldname): return iter([]) def has_deletions(self): return False def is_deleted(self, docnum): return False def stored_fields(self, docnum): raise KeyError(f"No document number {docnum}") def all_stored_fields(self): return iter([]) def doc_count_all(self): return 0 def doc_count(self): return 0 def frequency(self, fieldname, text): return 0 def doc_frequency(self, fieldname, text): return 0 def field_length(self, fieldname): return 0 def min_field_length(self, fieldname): return 0 def max_field_length(self, fieldname): return 0 def doc_field_length(self, docnum, fieldname, default=0): return default def postings(self, fieldname, text, scorer=None): raise TermNotFound(f"{fieldname}:{text!r}") def has_vector(self, docnum, fieldname): return False def vector(self, docnum, fieldname, format_=None): raise KeyError(f"No document number {docnum}") def most_frequent_terms(self, fieldname, number=5, prefix=""): return iter([]) def most_distinctive_terms(self, fieldname, number=5, prefix=None): return iter([]) # Multisegment reader class
[docs]class MultiReader(IndexReader): """Do not instantiate this object directly. Instead use Index.reader().""" def __init__(self, readers, generation=None): self.readers = readers self._gen = generation self.schema = None if readers: self.schema = readers[0].schema self.doc_offsets = [] self.base = 0 for r in self.readers: self.doc_offsets.append(self.base) self.base += r.doc_count_all() self.is_closed = False def _document_segment(self, docnum): return max(0, bisect_right(self.doc_offsets, docnum) - 1) def _segment_and_docnum(self, docnum): segmentnum = self._document_segment(docnum) offset = self.doc_offsets[segmentnum] return segmentnum, docnum - offset def cursor(self, fieldname): return MultiCursor([r.cursor(fieldname) for r in self.readers]) def segments(self): return [ reader.segment() for reader in self.readers if reader.segment() is not None ] def is_atomic(self): return False def leaf_readers(self): return list(zip(self.readers, self.doc_offsets)) def add_reader(self, reader): self.readers.append(reader) self.doc_offsets.append(self.base) self.base += reader.doc_count_all() def close(self): for d in self.readers: d.close() self.is_closed = True def generation(self): return self._gen def format(self, fieldname): for r in self.readers: fmt = r.format(fieldname) if fmt is not None: return fmt def vector_format(self, fieldname): for r in self.readers: vfmt = r.vector_format(fieldname) if vfmt is not None: return vfmt # Term methods def __contains__(self, term): return any(r.__contains__(term) for r in self.readers) def _merge_terms(self, iterlist): # Merge-sorts terms coming from a list of term iterators. # Create a map so we can look up each iterator by its id() value itermap = {} for it in iterlist: itermap[id(it)] = it # Fill in the list with the head term from each iterator. current = [] for it in iterlist: try: term = next(it) except StopIteration: continue current.append((term, id(it))) # Number of active iterators active = len(current) # If only one iterator is active, just yield from it and return if active == 1: term, itid = current[0] it = itermap[itid] yield term for term in it: yield term return # Otherwise, do a streaming heap sort of the terms from the iterators heapify(current) while active: # Peek at the first term in the sorted list term = current[0][0] # Re-iterate on all items in the list that have that term while active and current[0][0] == term: it = itermap[current[0][1]] try: nextterm = next(it) heapreplace(current, (nextterm, id(it))) except StopIteration: heappop(current) active -= 1 # Yield the term yield term def indexed_field_names(self): names = set() for r in self.readers: names.update(r.indexed_field_names()) return iter(names) def all_terms(self): return self._merge_terms([r.all_terms() for r in self.readers]) def terms_from(self, fieldname, prefix): return self._merge_terms( [r.terms_from(fieldname, prefix) for r in self.readers] ) def term_info(self, fieldname, text): term = (fieldname, text) # Get the term infos for the sub-readers containing the term tis = [ (r.term_info(fieldname, text), offset) for r, offset in zip(self.readers, self.doc_offsets) if term in r ] # If only one reader had the term, return its terminfo with the offset # added if not tis: raise TermNotFound(term) return combine_terminfos(tis) def frequency(self, fieldname, text): return sum(r.frequency(fieldname, text) for r in self.readers) def doc_frequency(self, fieldname, text): return sum(r.doc_frequency(fieldname, text) for r in self.readers) def postings(self, fieldname, text): # This method does not add a scorer; for that, use Searcher.postings() postreaders = [] docoffsets = [] term = (fieldname, text) for i, r in enumerate(self.readers): if term in r: offset = self.doc_offsets[i] pr = r.postings(fieldname, text) postreaders.append(pr) docoffsets.append(offset) if not postreaders: raise TermNotFound(fieldname, text) return MultiMatcher(postreaders, docoffsets) def first_id(self, fieldname, text): for i, r in enumerate(self.readers): try: id = r.first_id(fieldname, text) except (KeyError, TermNotFound): pass else: if id is None: raise TermNotFound((fieldname, text)) else: return self.doc_offsets[i] + id raise TermNotFound((fieldname, text)) # Deletion methods def has_deletions(self): return any(r.has_deletions() for r in self.readers) def is_deleted(self, docnum): segmentnum, segmentdoc = self._segment_and_docnum(docnum) return self.readers[segmentnum].is_deleted(segmentdoc) def stored_fields(self, docnum): segmentnum, segmentdoc = self._segment_and_docnum(docnum) return self.readers[segmentnum].stored_fields(segmentdoc) # Columns def has_column(self, fieldname): return any(r.has_column(fieldname) for r in self.readers) def column_reader(self, fieldname, column=None, reverse=False, translate=True): crs = [] doc_offsets = [] for i, r in enumerate(self.readers): if r.has_column(fieldname): cr = r.column_reader( fieldname, column=column, reverse=reverse, translate=translate ) crs.append(cr) doc_offsets.append(self.doc_offsets[i]) return columns.MultiColumnReader(crs, doc_offsets) # Per doc methods def all_stored_fields(self): for reader in self.readers: yield from reader.all_stored_fields() def doc_count_all(self): return sum(dr.doc_count_all() for dr in self.readers) def doc_count(self): return sum(dr.doc_count() for dr in self.readers) def field_length(self, fieldname): return sum(dr.field_length(fieldname) for dr in self.readers) def min_field_length(self, fieldname): return min(r.min_field_length(fieldname) for r in self.readers) def max_field_length(self, fieldname): return max(r.max_field_length(fieldname) for r in self.readers) def doc_field_length(self, docnum, fieldname, default=0): segmentnum, segmentdoc = self._segment_and_docnum(docnum) reader = self.readers[segmentnum] return reader.doc_field_length(segmentdoc, fieldname, default=default) def has_vector(self, docnum, fieldname): segmentnum, segmentdoc = self._segment_and_docnum(docnum) return self.readers[segmentnum].has_vector(segmentdoc, fieldname) def vector(self, docnum, fieldname, format_=None): segmentnum, segmentdoc = self._segment_and_docnum(docnum) return self.readers[segmentnum].vector(segmentdoc, fieldname) def vector_as(self, astype, docnum, fieldname): segmentnum, segmentdoc = self._segment_and_docnum(docnum) return self.readers[segmentnum].vector_as(astype, segmentdoc, fieldname)
def combine_terminfos(tis): if len(tis) == 1: ti, offset = tis[0] ti._minid += offset ti._maxid += offset return ti # Combine the various statistics w = sum(ti.weight() for ti, _ in tis) df = sum(ti.doc_frequency() for ti, _ in tis) ml = min(ti.min_length() for ti, _ in tis) xl = max(ti.max_length() for ti, _ in tis) xw = max(ti.max_weight() for ti, _ in tis) # For min and max ID, we need to add the doc offsets mid = min(ti.min_id() + offset for ti, offset in tis) xid = max(ti.max_id() + offset for ti, offset in tis) return TermInfo(w, df, ml, xl, xw, mid, xid) class MultiCursor: def __init__(self, cursors): self._cursors = [c for c in cursors if c.is_valid()] self._low = [] self._text = None self.next() def _find_low(self): low = [] lowterm = None for c in self._cursors: if c.is_valid(): cterm = c.term() if low and cterm == lowterm: low.append(c) elif low and cterm < lowterm: low = [c] lowterm = cterm self._low = low self._text = lowterm return lowterm def first(self): for c in self._cursors: c.first() return self._find_low() def find(self, term): for c in self._cursors: c.find(term) return self._find_low() def next(self): for c in self._cursors: c.next() return self._find_low() def term_info(self): tis = [c.term_info() for c in self._low] return combine_terminfos(tis) if tis else None def is_valid(self): return any(c.is_valid() for c in self._cursors)