Source code for whoosh.codec.base

# Copyright 2011 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

"""
This module contains base classes/interfaces for "codec" objects.
"""

from abc import abstractmethod
from bisect import bisect_right

from whoosh import columns
from whoosh.automata import lev
from whoosh.filedb.compound import CompoundStorage
from whoosh.system import emptybytes
from whoosh.util import random_name

# Exceptions


class OutOfOrderError(Exception):
    pass


# Base classes


[docs]class Codec: length_stats = True # Per document value writer @abstractmethod def per_document_writer(self, storage, segment): raise NotImplementedError # Inverted index writer @abstractmethod def field_writer(self, storage, segment): raise NotImplementedError # Postings @abstractmethod def postings_writer(self, dbfile, byteids=False): raise NotImplementedError @abstractmethod def postings_reader(self, dbfile, terminfo, format_, term=None, scorer=None): raise NotImplementedError # Index readers def automata(self, storage, segment): _ = storage, segment # Unused arguments return Automata() @abstractmethod def terms_reader(self, storage, segment): raise NotImplementedError @abstractmethod def per_document_reader(self, storage, segment): raise NotImplementedError # Segments and generations @abstractmethod def new_segment(self, storage, indexname): raise NotImplementedError
class WrappingCodec(Codec): def __init__(self, child): self._child = child def per_document_writer(self, storage, segment): return self._child.per_document_writer(storage, segment) def field_writer(self, storage, segment): return self._child.field_writer(storage, segment) def postings_writer(self, dbfile, byteids=False): return self._child.postings_writer(dbfile, byteids=byteids) def postings_reader(self, dbfile, terminfo, format_, term=None, scorer=None): return self._child.postings_reader( dbfile, terminfo, format_, term=term, scorer=scorer ) def automata(self, storage, segment): return self._child.automata(storage, segment) def terms_reader(self, storage, segment): return self._child.terms_reader(storage, segment) def per_document_reader(self, storage, segment): return self._child.per_document_reader(storage, segment) def new_segment(self, storage, indexname): return self._child.new_segment(storage, indexname) # Writer classes
[docs]class PerDocumentWriter: @abstractmethod def start_doc(self, docnum): raise NotImplementedError @abstractmethod def add_field(self, fieldname, fieldobj, value, length): raise NotImplementedError @abstractmethod def add_column_value(self, fieldname, columnobj, value): raise NotImplementedError("Codec does not implement writing columns") @abstractmethod def add_vector_items(self, fieldname, fieldobj, items): raise NotImplementedError def add_vector_matcher(self, fieldname, fieldobj, vmatcher): def readitems(): while vmatcher.is_active(): text = vmatcher.id() weight = vmatcher.weight() valuestring = vmatcher.value() yield (text, weight, valuestring) vmatcher.next() self.add_vector_items(fieldname, fieldobj, readitems()) def finish_doc(self): # This method is intentionally left empty. pass def close(self): # This method is intentionally left empty. pass
[docs]class FieldWriter: def add_postings(self, schema, lengths, items): # This method translates a generator of (fieldname, btext, docnum, w, v) # postings into calls to start_field(), start_term(), add(), # finish_term(), finish_field(), etc. start_field = self.start_field start_term = self.start_term add = self.add finish_term = self.finish_term finish_field = self.finish_field if lengths: dfl = lengths.doc_field_length else: dfl = lambda docnum, fieldname: 0 # The fieldname of the previous posting lastfn = None # The bytes text of the previous posting lasttext = None # The (fieldname, btext) of the previous spelling posting lastspell = None # The field object for the current field fieldobj = None for fieldname, btext, docnum, weight, value in items: # Check for out-of-order postings. This is convoluted because Python # 3 removed the ability to compare a string to None if lastfn is not None and fieldname < lastfn: raise OutOfOrderError(f"Field {lastfn!r} .. {fieldname!r}") if fieldname == lastfn and lasttext and btext < lasttext: raise OutOfOrderError( f"Term {lastfn}:{lasttext!r} .. {fieldname}:{btext!r}" ) # If the fieldname of this posting is different from the last one, # tell the writer we're starting a new field if fieldname != lastfn: if lasttext is not None: finish_term() if lastfn is not None and fieldname != lastfn: finish_field() fieldobj = schema[fieldname] start_field(fieldname, fieldobj) lastfn = fieldname lasttext = None # HACK: items where docnum == -1 indicate words that should be added # to the spelling graph, not the postings if docnum == -1: # spellterm = (fieldname, btext) # # There can be duplicates of spelling terms, so only add a spell # # term if it's greater than the last one # if lastspell is None or spellterm > lastspell: # spellword = fieldobj.from_bytes(btext) # self.add_spell_word(fieldname, spellword) # lastspell = spellterm continue # If this term is different from the term in the previous posting, # tell the writer to start a new term if btext != lasttext: if lasttext is not None: finish_term() start_term(btext) lasttext = btext # Add this posting length = dfl(docnum, fieldname) if value is None: value = emptybytes add(docnum, weight, value, length) if lasttext is not None: finish_term() if lastfn is not None: finish_field() @abstractmethod def start_field(self, fieldname, fieldobj): raise NotImplementedError @abstractmethod def start_term(self, text): raise NotImplementedError @abstractmethod def add(self, docnum, weight, vbytes, length): raise NotImplementedError def add_spell_word(self, fieldname, text): raise NotImplementedError @abstractmethod def finish_term(self): raise NotImplementedError def finish_field(self): # This method is intentionally left empty. pass def close(self): pass
# Postings
[docs]class PostingsWriter: @abstractmethod def start_postings(self, format_, terminfo): raise NotImplementedError @abstractmethod def add_posting(self, id_, weight, vbytes, length=None): raise NotImplementedError def finish_postings(self): # This method is intentionally left empty. pass
[docs] @abstractmethod def written(self): """Returns True if this object has already written to disk.""" raise NotImplementedError
# Reader classes class FieldCursor: def first(self): raise NotImplementedError def find(self, string): raise NotImplementedError def next(self): raise NotImplementedError def term(self): raise NotImplementedError
[docs]class TermsReader: @abstractmethod def __contains__(self, term): raise NotImplementedError @abstractmethod def cursor(self, fieldname, fieldobj): raise NotImplementedError @abstractmethod def terms(self): raise NotImplementedError @abstractmethod def terms_from(self, fieldname, prefix): raise NotImplementedError @abstractmethod def items(self): raise NotImplementedError @abstractmethod def items_from(self, fieldname, prefix): raise NotImplementedError @abstractmethod def term_info(self, fieldname, text): raise NotImplementedError @abstractmethod def frequency(self, fieldname, text): return self.term_info(fieldname, text).weight() @abstractmethod def doc_frequency(self, fieldname, text): return self.term_info(fieldname, text).doc_frequency() @abstractmethod def matcher(self, fieldname, text, format_, scorer=None): raise NotImplementedError @abstractmethod def indexed_field_names(self): raise NotImplementedError def close(self): # This method is intentionally left empty. pass
class Automata: @staticmethod def levenshtein_dfa(uterm, maxdist, prefix=0): return lev.levenshtein_automaton(uterm, maxdist, prefix).to_dfa() @staticmethod def find_matches(dfa, cur): unull = chr(0) term = cur.text() if term is None: return match = dfa.next_valid_string(term) while match: cur.find(match) term = cur.text() if term is None: return if match == term: yield match term += unull match = dfa.next_valid_string(term) def terms_within(self, fieldcur, uterm, maxdist, prefix=0): dfa = self.levenshtein_dfa(uterm, maxdist, prefix) return self.find_matches(dfa, fieldcur) # Per-doc value reader
[docs]class PerDocumentReader: def close(self): # This method is intentionally left empty. pass @abstractmethod def doc_count(self): raise NotImplementedError @abstractmethod def doc_count_all(self): raise NotImplementedError # Deletions @abstractmethod def has_deletions(self): raise NotImplementedError @abstractmethod def is_deleted(self, docnum): raise NotImplementedError @abstractmethod def deleted_docs(self): raise NotImplementedError
[docs] def all_doc_ids(self): """ Returns an iterator of all (undeleted) document IDs in the reader. """ is_deleted = self.is_deleted return ( docnum for docnum in range(self.doc_count_all()) if not is_deleted(docnum) )
def iter_docs(self): for docnum in self.all_doc_ids(): yield docnum, self.stored_fields(docnum) # Columns def supports_columns(self): return False def has_column(self, fieldname): _ = fieldname # Unused argument return False def list_columns(self): raise NotImplementedError # Don't need to override this if supports_columns() returns False def column_reader(self, fieldname, column): raise NotImplementedError # Bitmaps def field_docs(self, fieldname): _ = fieldname # Unused argument return None # Lengths @abstractmethod def doc_field_length(self, docnum, fieldname, default=0): raise NotImplementedError @abstractmethod def field_length(self, fieldname): raise NotImplementedError @abstractmethod def min_field_length(self, fieldname): raise NotImplementedError @abstractmethod def max_field_length(self, fieldname): raise NotImplementedError # Vectors def has_vector(self, docnum, fieldname): _ = docnum, fieldname # Unused arguments return False # Don't need to override this if has_vector() always returns False def vector(self, docnum, fieldname, format_): raise NotImplementedError # Stored @abstractmethod def stored_fields(self, docnum): raise NotImplementedError def all_stored_fields(self): for docnum in self.all_doc_ids(): yield self.stored_fields(docnum)
# Segment base class
[docs]class Segment: """Do not instantiate this object directly. It is used by the Index object to hold information about a segment. A list of objects of this class are pickled as part of the TOC file. The TOC file stores a minimal amount of information -- mostly a list of Segment objects. Segments are the real reverse indexes. Having multiple segments allows quick incremental indexing: just create a new segment for the new documents, and have the index overlay the new segment over previous ones for purposes of reading/search. "Optimizing" the index combines the contents of existing segments into one (removing any deleted documents along the way). """ # Extension for compound segment files COMPOUND_EXT = ".seg" # self.indexname # self.segid def __init__(self, indexname): self.indexname = indexname self.segid = self._random_id() self.compound = False @classmethod def _random_id(cls, size=16): return random_name(size=size) def __repr__(self): return f"<{self.__class__.__name__} {self.segment_id()}>" def __eq__(self, other): return isinstance(other, type(self)) and self.segment_id() == other.segment_id() def __hash__(self): return hash(self.segment_id()) def codec(self): raise NotImplementedError def index_name(self): return self.indexname def segment_id(self): if hasattr(self, "name"): # Old segment class return self.name else: return f"{self.index_name()}_{self.segid}" def is_compound(self): if not hasattr(self, "compound"): return False return self.compound # File convenience methods def make_filename(self, ext): return f"{self.segment_id()}{ext}" def list_files(self, storage): prefix = f"{self.segment_id()}." return [name for name in storage.list() if name.startswith(prefix)]
[docs] def create_file(self, storage, ext, **kwargs): """Convenience method to create a new file in the given storage named with this segment's ID and the given extension. Any keyword arguments are passed to the storage's create_file method. """ fname = self.make_filename(ext) return storage.create_file(fname, **kwargs)
[docs] def open_file(self, storage, ext, **kwargs): """Convenience method to open a file in the given storage named with this segment's ID and the given extension. Any keyword arguments are passed to the storage's open_file method. """ fname = self.make_filename(ext) return storage.open_file(fname, **kwargs)
def create_compound_file(self, storage): segfiles = self.list_files(storage) assert not any(name.endswith(self.COMPOUND_EXT) for name in segfiles) cfile = self.create_file(storage, self.COMPOUND_EXT) CompoundStorage.assemble(cfile, storage, segfiles) for name in segfiles: storage.delete_file(name) self.compound = True def open_compound_file(self, storage): name = self.make_filename(self.COMPOUND_EXT) dbfile = storage.open_file(name) return CompoundStorage(dbfile, use_mmap=storage.supports_mmap) # Abstract methods
[docs] @abstractmethod def doc_count_all(self): """ Returns the total number of documents, DELETED OR UNDELETED, in this segment. """ raise NotImplementedError
[docs] def doc_count(self): """ Returns the number of (undeleted) documents in this segment. """ return self.doc_count_all() - self.deleted_count()
def set_doc_count(self, doccount): raise NotImplementedError
[docs] def has_deletions(self): """ Returns True if any documents in this segment are deleted. """ return self.deleted_count() > 0
[docs] @abstractmethod def deleted_count(self): """ Returns the total number of deleted documents in this segment. """ raise NotImplementedError
@abstractmethod def deleted_docs(self): raise NotImplementedError
[docs] @abstractmethod def delete_document(self, docnum, delete=True): """Deletes the given document number. The document is not actually removed from the index until it is optimized. :param docnum: The document number to delete. :param delete: If False, this undeletes a deleted document. """ raise NotImplementedError
[docs] @abstractmethod def is_deleted(self, docnum): """ Returns True if the given document number is deleted. """ raise NotImplementedError
def should_assemble(self): return True
# Wrapping Segment class WrappingSegment(Segment): def __init__(self, child): self._child = child def codec(self): return self._child.codec() def index_name(self): return self._child.index_name() def segment_id(self): return self._child.segment_id() def is_compound(self): return self._child.is_compound() def should_assemble(self): return self._child.should_assemble() def make_filename(self, ext): return self._child.make_filename(ext) def list_files(self, storage): return self._child.list_files(storage) def create_file(self, storage, ext, **kwargs): return self._child.create_file(storage, ext, **kwargs) def open_file(self, storage, ext, **kwargs): return self._child.open_file(storage, ext, **kwargs) def create_compound_file(self, storage): return self._child.create_compound_file(storage) def open_compound_file(self, storage): return self._child.open_compound_file(storage) def delete_document(self, docnum, delete=True): return self._child.delete_document(docnum, delete=delete) def has_deletions(self): return self._child.has_deletions() def deleted_count(self): return self._child.deleted_count() def deleted_docs(self): return self._child.deleted_docs() def is_deleted(self, docnum): return self._child.is_deleted(docnum) def set_doc_count(self, doccount): self._child.set_doc_count(doccount) def doc_count(self): return self._child.doc_count() def doc_count_all(self): return self._child.doc_count_all() # Multi per doc reader class MultiPerDocumentReader(PerDocumentReader): def __init__(self, readers, offset=0): self._readers = readers self._doc_offsets = [] self._doccount = 0 for pdr in readers: self._doc_offsets.append(self._doccount) self._doccount += pdr.doc_count_all() self.is_closed = False def close(self): for r in self._readers: r.close() self.is_closed = True def doc_count_all(self): return self._doccount def doc_count(self): total = 0 for r in self._readers: total += r.doc_count() return total def _document_reader(self, docnum): return max(0, bisect_right(self._doc_offsets, docnum) - 1) def _reader_and_docnum(self, docnum): rnum = self._document_reader(docnum) offset = self._doc_offsets[rnum] return rnum, docnum - offset # Deletions def has_deletions(self): return any(r.has_deletions() for r in self._readers) def is_deleted(self, docnum): x, y = self._reader_and_docnum(docnum) return self._readers[x].is_deleted(y) def deleted_docs(self): for r, offset in zip(self._readers, self._doc_offsets): for docnum in r.deleted_docs(): yield docnum + offset def all_doc_ids(self): for r, offset in zip(self._readers, self._doc_offsets): for docnum in r.all_doc_ids(): yield docnum + offset # Columns def has_column(self, fieldname): return any(r.has_column(fieldname) for r in self._readers) def column_reader(self, fieldname, column): if not self.has_column(fieldname): raise ValueError(f"No column {fieldname!r}") default = column.default_value() colreaders = [] for r in self._readers: if r.has_column(fieldname): cr = r.column_reader(fieldname, column) else: cr = columns.EmptyColumnReader(default, r.doc_count_all()) colreaders.append(cr) if len(colreaders) == 1: return colreaders[0] else: return columns.MultiColumnReader(colreaders) # Lengths def doc_field_length(self, docnum, fieldname, default=0): x, y = self._reader_and_docnum(docnum) return self._readers[x].doc_field_length(y, fieldname, default) def field_length(self, fieldname): total = 0 for r in self._readers: total += r.field_length(fieldname) return total def min_field_length(self): return min(r.min_field_length() for r in self._readers) def max_field_length(self): return max(r.max_field_length() for r in self._readers) # Extended base classes class PerDocWriterWithColumns(PerDocumentWriter): def __init__(self): PerDocumentWriter.__init__(self) # Implementations need to set these attributes self._storage = None self._segment = None self._docnum = None @abstractmethod def _has_column(self, fieldname): raise NotImplementedError @abstractmethod def _create_column(self, fieldname, column): raise NotImplementedError @abstractmethod def _get_column(self, fieldname): raise NotImplementedError def add_column_value(self, fieldname, column, value): if not self._has_column(fieldname): self._create_column(fieldname, column) self._get_column(fieldname).add(self._docnum, value) # FieldCursor implementations class EmptyCursor(FieldCursor): def first(self): return None def find(self, term): return None def next(self): return None def text(self): return None def term_info(self): return None def is_valid(self): return False