# Copyright 2007 Matt Chaput. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of Matt Chaput.
import random
import sys
import time
from bisect import insort
from functools import wraps
# These must be valid separate characters in CASE-INSENSTIVE filenames
IDCHARS = "0123456789abcdefghijklmnopqrstuvwxyz"
if hasattr(time, "perf_counter"):
now = time.perf_counter
elif sys.platform == "win32":
now = time.clock
else:
now = time.time
def random_name(size=28):
return "".join(random.choice(IDCHARS) for _ in range(size))
def random_bytes(size=28):
return bytes(random.randint(0, 255) for _ in range(size))
[docs]def make_binary_tree(fn, args, **kwargs):
"""Takes a function/class that takes two positional arguments and a list of
arguments and returns a binary tree of results/instances.
>>> make_binary_tree(UnionMatcher, [matcher1, matcher2, matcher3])
UnionMatcher(matcher1, UnionMatcher(matcher2, matcher3))
Any keyword arguments given to this function are passed to the class
initializer.
"""
count = len(args)
if not count:
raise ValueError("Called make_binary_tree with empty list")
elif count == 1:
return args[0]
half = count // 2
return fn(
make_binary_tree(fn, args[:half], **kwargs),
make_binary_tree(fn, args[half:], **kwargs),
**kwargs,
)
[docs]def make_weighted_tree(fn, ls, **kwargs):
"""Takes a function/class that takes two positional arguments and a list of
(weight, argument) tuples and returns a huffman-like weighted tree of
results/instances.
"""
if not ls:
raise ValueError("Called make_weighted_tree with empty list")
ls.sort()
while len(ls) > 1:
a = ls.pop(0)
b = ls.pop(0)
insort(ls, (a[0] + b[0], fn(a[1], b[1])))
return ls[0][1]
# Fibonacci function
_fib_cache = {}
[docs]def fib(n):
"""Returns the nth value in the Fibonacci sequence."""
if n <= 2:
return n
if n in _fib_cache:
return _fib_cache[n]
result = fib(n - 1) + fib(n - 2)
_fib_cache[n] = result
return result
# Decorators
[docs]def synchronized(func):
"""Decorator for storage-access methods, which synchronizes on a threading
lock. The parent object must have 'is_closed' and '_sync_lock' attributes.
"""
@wraps(func)
def synchronized_wrapper(self, *args, **kwargs):
with self._sync_lock:
return func(self, *args, **kwargs)
return synchronized_wrapper
[docs]def unclosed(method):
"""
Decorator to check if the object is closed.
"""
@wraps(method)
def unclosed_wrapper(self, *args, **kwargs):
if self.closed:
raise ValueError("Operation on a closed object")
return method(self, *args, **kwargs)
return unclosed_wrapper