Source code for whoosh.analysis.filters

# Copyright 2007 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    1. Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#    2. Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.

from itertools import chain

from whoosh.analysis.acore import Composable
from whoosh.util.text import rcompile

# Default list of stop words (words so common it's usually wasteful to index
# them). This list is used by the StopFilter class, which allows you to supply
# an optional list to override this one.

STOP_WORDS = frozenset(
    (
        "a",
        "an",
        "and",
        "are",
        "as",
        "at",
        "be",
        "by",
        "can",
        "for",
        "from",
        "have",
        "if",
        "in",
        "is",
        "it",
        "may",
        "not",
        "of",
        "on",
        "or",
        "tbd",
        "that",
        "the",
        "this",
        "to",
        "us",
        "we",
        "when",
        "will",
        "with",
        "yet",
        "you",
        "your",
    )
)


# Simple pattern for filtering URLs, may be useful

url_pattern = rcompile(
    """
(
    [A-Za-z+]+://          # URL protocol
    \\S+?                  # URL body
    (?=\\s|[.]\\s|$|[.]$)  # Stop at space/end, or a dot followed by space/end
) | (                      # or...
    \\w+([:.]?\\w+)*         # word characters, with opt. internal colons/dots
)
""",
    verbose=True,
)


# Filters


class Filter(Composable):
    """Base class for Filter objects. A Filter subclass must implement a
    filter() method that takes a single argument, which is an iterator of Token
    objects, and yield a series of Token objects in return.

    Filters that do morphological transformation of tokens (e.g. stemming)
    should set their ``is_morph`` attribute to True.
    """

    def __eq__(self, other):
        return (
            other
            and self.__class__ is other.__class__
            and self.__dict__ == other.__dict__
        )

    def __ne__(self, other):
        return self != other

    def __call__(self, tokens):
        raise NotImplementedError


[docs]class PassFilter(Filter): """An identity filter: passes the tokens through untouched.""" def __call__(self, tokens): return tokens
[docs]class LoggingFilter(Filter): """Prints the contents of every filter that passes through as a debug log entry. """ def __init__(self, logger=None): """ :param target: the logger to use. If omitted, the "whoosh.analysis" logger is used. """ if logger is None: import logging logger = logging.getLogger("whoosh.analysis") self.logger = logger def __call__(self, tokens): logger = self.logger for t in tokens: logger.debug(repr(t)) yield t
[docs]class MultiFilter(Filter): """Chooses one of two or more sub-filters based on the 'mode' attribute of the token stream. """ default_filter = PassFilter() def __init__(self, **kwargs): """Use keyword arguments to associate mode attribute values with instantiated filters. >>> iwf_for_index = IntraWordFilter(mergewords=True, mergenums=False) >>> iwf_for_query = IntraWordFilter(mergewords=False, mergenums=False) >>> mf = MultiFilter(index=iwf_for_index, query=iwf_for_query) This class expects that the value of the mode attribute is consistent among all tokens in a token stream. """ self.filters = kwargs def __eq__(self, other): return ( other and self.__class__ is other.__class__ and self.filters == other.filters ) def __call__(self, tokens): # Only selects on the first token t = next(tokens) selected_filter = self.filters.get(t.mode, self.default_filter) return selected_filter(chain([t], tokens))
[docs]class TeeFilter(Filter): r"""Interleaves the results of two or more filters (or filter chains). NOTE: because it needs to create copies of each token for each sub-filter, this filter is quite slow. >>> target = "ALFA BRAVO CHARLIE" >>> # In one branch, we'll lower-case the tokens >>> f1 = LowercaseFilter() >>> # In the other branch, we'll reverse the tokens >>> f2 = ReverseTextFilter() >>> ana = RegexTokenizer(r"\S+") | TeeFilter(f1, f2) >>> [token.text for token in ana(target)] ["alfa", "AFLA", "bravo", "OVARB", "charlie", "EILRAHC"] To combine the incoming token stream with the output of a filter chain, use ``TeeFilter`` and make one of the filters a :class:`PassFilter`. >>> f1 = PassFilter() >>> f2 = BiWordFilter() >>> ana = RegexTokenizer(r"\S+") | TeeFilter(f1, f2) | LowercaseFilter() >>> [token.text for token in ana(target)] ["alfa", "alfa-bravo", "bravo", "bravo-charlie", "charlie"] """ def __init__(self, *filters): if len(filters) < 2: raise ValueError("TeeFilter requires two or more filters") self.filters = filters def __eq__(self, other): return self.__class__ is other.__class__ and self.filters == other.fitlers def __call__(self, tokens): from itertools import tee count = len(self.filters) # Tee the token iterator and wrap each teed iterator with the # corresponding filter gens = [ filter(t.copy() for t in gen) for filter, gen in zip(self.filters, tee(tokens, count)) ] # Keep a count of the number of running iterators running = count while running: for i, gen in enumerate(gens): if gen is not None: try: yield next(gen) except StopIteration: gens[i] = None running -= 1
[docs]class ReverseTextFilter(Filter): """Reverses the text of each token. >>> ana = RegexTokenizer() | ReverseTextFilter() >>> [token.text for token in ana("hello there")] ["olleh", "ereht"] """ def __call__(self, tokens): for t in tokens: t.text = t.text[::-1] yield t
[docs]class LowercaseFilter(Filter): """Uses unicode.lower() to lowercase token text. >>> rext = RegexTokenizer() >>> stream = rext("This is a TEST") >>> [token.text for token in LowercaseFilter(stream)] ["this", "is", "a", "test"] """ def __call__(self, tokens): for t in tokens: t.text = t.text.lower() yield t
[docs]class StripFilter(Filter): """Calls unicode.strip() on the token text.""" def __call__(self, tokens): for t in tokens: t.text = t.text.strip() yield t
[docs]class StopFilter(Filter): """Marks "stop" words (words too common to index) in the stream (and by default removes them). Make sure you precede this filter with a :class:`LowercaseFilter`. >>> stopper = RegexTokenizer() | StopFilter() >>> [token.text for token in stopper(u"this is a test")] ["test"] >>> es_stopper = RegexTokenizer() | StopFilter(lang="es") >>> [token.text for token in es_stopper(u"el lapiz es en la mesa")] ["lapiz", "mesa"] The list of available languages is in `whoosh.lang.languages`. You can use :func:`whoosh.lang.has_stopwords` to check if a given language has a stop word list available. """ def __init__( self, stoplist=STOP_WORDS, minsize=2, maxsize=None, renumber=True, lang=None ): """ :param stoplist: A collection of words to remove from the stream. This is converted to a frozenset. The default is a list of common English stop words. :param minsize: The minimum length of token texts. Tokens with text smaller than this will be stopped. The default is 2. :param maxsize: The maximum length of token texts. Tokens with text larger than this will be stopped. Use None to allow any length. :param renumber: Change the 'pos' attribute of unstopped tokens to reflect their position with the stopped words removed. :param lang: Automatically get a list of stop words for the given language """ stops = set() if stoplist: stops.update(stoplist) if lang: from whoosh.lang import stopwords_for_language stops.update(stopwords_for_language(lang)) self.stops = frozenset(stops) self.min = minsize self.max = maxsize self.renumber = renumber def __eq__(self, other): return ( other and self.__class__ is other.__class__ and self.stops == other.stops and self.min == other.min and self.renumber == other.renumber ) def __call__(self, tokens): stoplist = self.stops minsize = self.min maxsize = self.max renumber = self.renumber pos = None for t in tokens: text = t.text if ( len(text) >= minsize and (maxsize is None or len(text) <= maxsize) and text not in stoplist ): # This is not a stop word if renumber and t.positions: if pos is None: pos = t.pos else: pos += 1 t.pos = pos t.stopped = False yield t else: # This is a stop word if not t.removestops: # This IS a stop word, but we're not removing them t.stopped = True yield t
[docs]class CharsetFilter(Filter): """Translates the text of tokens by calling unicode.translate() using the supplied character mapping object. This is useful for case and accent folding. The ``whoosh.support.charset`` module has a useful map for accent folding. >>> from whoosh.support.charset import accent_map >>> retokenizer = RegexTokenizer() >>> chfilter = CharsetFilter(accent_map) >>> [t.text for t in chfilter(retokenizer(u'café'))] [u'cafe'] Another way to get a character mapping object is to convert a Sphinx charset table file using :func:`whoosh.support.charset.charset_table_to_dict`. >>> from whoosh.support.charset import charset_table_to_dict >>> from whoosh.support.charset import default_charset >>> retokenizer = RegexTokenizer() >>> charmap = charset_table_to_dict(default_charset) >>> chfilter = CharsetFilter(charmap) >>> [t.text for t in chfilter(retokenizer(u'Stra\\xdfe'))] [u'strase'] The Sphinx charset table format is described at http://www.sphinxsearch.com/docs/current.html#conf-charset-table. """ __inittypes__ = {"charmap": dict} def __init__(self, charmap): """ :param charmap: a dictionary mapping from integer character numbers to unicode characters, as required by the unicode.translate() method. """ self.charmap = charmap def __eq__(self, other): return ( other and self.__class__ is other.__class__ and self.charmap == other.charmap ) def __call__(self, tokens): assert hasattr(tokens, "__iter__") charmap = self.charmap for t in tokens: t.text = t.text.translate(charmap) yield t
[docs]class DelimitedAttributeFilter(Filter): """Looks for delimiter characters in the text of each token and stores the data after the delimiter in a named attribute on the token. The defaults are set up to use the ``^`` character as a delimiter and store the value after the ``^`` as the boost for the token. >>> daf = DelimitedAttributeFilter(delimiter="^", attribute="boost") >>> ana = RegexTokenizer("\\\\S+") | DelimitedAttributeFilter() >>> for t in ana(u("image render^2 file^0.5")) ... print("%r %f" % (t.text, t.boost)) 'image' 1.0 'render' 2.0 'file' 0.5 Note that you need to make sure your tokenizer includes the delimiter and data as part of the token! """ def __init__(self, delimiter="^", attribute="boost", default=1.0, type=float): """ :param delimiter: a string that, when present in a token's text, separates the actual text from the "data" payload. :param attribute: the name of the attribute in which to store the data on the token. :param default: the value to use for the attribute for tokens that don't have delimited data. :param type: the type of the data, for example ``str`` or ``float``. This is used to convert the string value of the data before storing it in the attribute. """ self.delim = delimiter self.attr = attribute self.default = default self.type = type def __eq__(self, other): return ( other and self.__class__ is other.__class__ and self.delim == other.delim and self.attr == other.attr and self.default == other.default ) def __call__(self, tokens): delim = self.delim attr = self.attr default = self.default type_ = self.type for t in tokens: text = t.text pos = text.find(delim) if pos > -1: setattr(t, attr, type_(text[pos + 1 :])) if t.chars: t.endchar -= len(t.text) - pos t.text = text[:pos] else: setattr(t, attr, default) yield t
[docs]class SubstitutionFilter(Filter): """Performs a regular expression substitution on the token text. This is especially useful for removing text from tokens, for example hyphens:: ana = RegexTokenizer(r"\\S+") | SubstitutionFilter("-", "") Because it has the full power of the re.sub() method behind it, this filter can perform some fairly complex transformations. For example, to take tokens like ``'a=b', 'c=d', 'e=f'`` and change them to ``'b=a', 'd=c', 'f=e'``:: # Analyzer that swaps the text on either side of an equal sign rt = RegexTokenizer(r"\\S+") sf = SubstitutionFilter("([^/]*)/(./*)", r"\\2/\\1") ana = rt | sf """ def __init__(self, pattern, replacement): """ :param pattern: a pattern string or compiled regular expression object describing the text to replace. :param replacement: the substitution text. """ self.pattern = rcompile(pattern) self.replacement = replacement def __eq__(self, other): return ( other and self.__class__ is other.__class__ and self.pattern == other.pattern and self.replacement == other.replacement ) def __call__(self, tokens): pattern = self.pattern replacement = self.replacement for t in tokens: t.text = pattern.sub(replacement, t.text) yield t