Source code for mincepy.expr

# -*- coding: utf-8 -*-
"""Query expressions"""
import abc
import copy
from typing import Union, List, Iterable

__all__ = (
    "Expr",
    "WithListOperand",
    "Empty",
    "Operator",
    "Eq",
    "Gt",
    "Gte",
    "In",
    "Lt",
    "Lte",
    "Ne",
    "Nin",
    "Comparison",
    "Logical",
    "And",
    "Not",
    "Or",
    "Nor",
    "Exists",
    "Queryable",
    "WithQueryContext",
    "query_expr",
    "field_name",
    "build_expr",
    "Query",
)

import bson.regex


class FilterLike(metaclass=abc.ABCMeta):
    """An abstract base class for objects representing a pyos path, e.g. pyos.pathlib.PurePath."""

    # pylint: disable=too-few-public-methods

    @abc.abstractmethod
    def __query_expr__(self) -> dict:
        """Return the query filter representation of the object."""


FilterSpec = Union[dict, FilterLike]


[docs]class Expr(FilterLike, metaclass=abc.ABCMeta): """The base class for query expressions. Expressions are tuples containing an operator or a field as a first part and a value or expression as second""" __slots__ = ()
[docs] def dict(self): """Return the query dictionary for this expression""" return self.__query_expr__()
def __and__(self, other: "Expr") -> "And": if not isinstance(other, Expr): raise TypeError(f"Expected Expr got '{other}'") return And([self, other]) def __or__(self, other: "Expr") -> "Or": if not isinstance(other, Expr): raise TypeError(f"Expected Expr got '{other}'") return Or([self, other])
[docs]class WithListOperand(FilterLike): """Mixin for expressions that take an operand that is a list""" # pylint: disable=no-member, too-few-public-methods def __init__(self, operand: List[Expr]): if not isinstance(operand, list): raise TypeError(f"Expected a list, got {type(operand).__name__}") for entry in operand: if not isinstance(entry, Expr): raise TypeError( f"Expected a list of Expr, found {type(entry).__name__}" ) self.operand = operand def __query_expr__(self) -> dict: if len(self.operand) == 1: return query_expr(self.operand[0]) return {self.oper: list(map(query_expr, self.operand))}
[docs]class Empty(Expr): """The empty expression""" def __query_expr__(self) -> dict: return {}
# region Match
[docs]class Operator(Expr): # pylint: disable=abstract-method """Interface for operators"""
class SimpleOperator(Operator): """A simple operator expression. Consists of an operator applied to an operand which is to be matched """ __slots__ = ("value",) oper = None # type: str def __init__(self, value): self.value = value def __query_expr__(self) -> dict: return {self.oper: self.value}
[docs]class Eq(SimpleOperator): __slots__ = () oper = "$eq"
[docs]class Gt(SimpleOperator): __slots__ = () oper = "$gt"
[docs]class Gte(SimpleOperator): __slots__ = () oper = "$gte"
[docs]class In(SimpleOperator): __slots__ = () oper = "$in"
[docs]class Lt(SimpleOperator): __slots__ = () oper = "$lt"
[docs]class Lte(SimpleOperator): __slots__ = () oper = "$lte"
[docs]class Ne(SimpleOperator): __slots__ = () oper = "$ne"
[docs]class Nin(SimpleOperator): __slots__ = () oper = "$nin"
COMPARISON_OPERATORS = { oper_type.oper: oper_type for oper_type in SimpleOperator.__subclasses__() }
[docs]class Comparison(Expr): """A comparison expression consists of a field and an operator expression e.g. name == 'frank' where name is the field, the operator is ==, and the value is 'frank' """ __slots__ = "field", "expr" def __init__(self, field, expr: Operator): if field is None: raise ValueError("field cannot be None") if not isinstance(expr, Operator): raise TypeError( f"Expected an operator expression, got '{type(expr).__name__}'" ) self.field = field self.expr = expr def __query_expr__(self) -> dict: if isinstance(self.expr, Eq): # Special case for this query as it looks nicer this way (without using '$eq') return {field_name(self.field): self.expr.value} return {field_name(self.field): query_expr(self.expr)}
# endregion # region Logical operators
[docs]class Logical(Expr): """A comparison operation. Consists of an operator applied to an operand which is matched in a particular way""" __slots__ = ("operand",) oper = None # type: str def __init__(self, operand: Expr): if not isinstance(operand, Expr): raise TypeError(f"Expected an Expr, got '{type(operand).__name__}'") self.operand = operand def __query_expr__(self) -> dict: return {self.oper: query_expr(self.operand)}
[docs]class And(WithListOperand, Logical): __slots__ = () oper = "$and" def __and__(self, other: "Expr") -> "And": if isinstance(other, And): # Economise on Ands and fuse them here return And([*self.operand, *other.operand]) return super().__and__(other)
[docs]class Not(Logical): __slots__ = () oper = "$not"
[docs]class Or(WithListOperand, Logical): __slots__ = () oper = "$or" def __or__(self, other: "Expr") -> "Or": if isinstance(other, Or): # Economise on Ors and fuse them here return Or([*self.operand, *other.operand]) return super().__or__(other)
[docs]class Nor(WithListOperand, Logical): __slots__ = () oper = "$nor"
# endregion # region Element operators
[docs]class Exists(SimpleOperator): __slots__ = () oper = "$exists" def __init__(self, value: bool): if not isinstance(value, bool): raise ValueError("Exists can only be True or False") super().__init__(value)
# endregion # region Evaluation operators class Regex(Operator): __slots__ = "pattern", "options" oper = "$regex" def __init__(self, pattern: Union[str, bson.regex.Regex], options: str = None): if not isinstance(pattern, (str, bson.regex.Regex)): raise ValueError("Must supply regex string or bson Regex object") self.pattern = pattern self.options = options def __query_expr__(self) -> dict: """Construct the regex expression""" expr = {"$regex": self.pattern} if self.options: expr["$options"] = self.options return expr # endregion class Queryable(metaclass=abc.ABCMeta): # region Query operations __slots__ = () __hash__ = object.__hash__ def __eq__(self, other) -> Comparison: return Comparison(self.get_path(), Eq(other)) def __ne__(self, other) -> Comparison: return Comparison(self.get_path(), Ne(other)) def __gt__(self, other) -> Comparison: return Comparison(self.get_path(), Gt(other)) def __ge__(self, other) -> Comparison: return Comparison(self.get_path(), Gte(other)) def __lt__(self, other) -> Comparison: return Comparison(self.get_path(), Lt(other)) def __le__(self, other) -> Comparison: return Comparison(self.get_path(), Lte(other)) def in_(self, *possibilities) -> Comparison: return Comparison(self.get_path(), In(possibilities)) def nin_(self, *possibilities) -> Expr: return Comparison(self.get_path(), Nin(possibilities)) def exists_(self, value: bool = True) -> Expr: return Comparison(self.get_path(), Exists(value)) def regex_(self, pattern, options: str = None) -> Expr: return Comparison(self.get_path(), Regex(pattern, options)) def starts_with_(self, pattern, options: str = None) -> Expr: return self.regex_(f"^{pattern}", options) @abc.abstractmethod def get_path(self) -> str: """Get the path for this object in the document"""
[docs]class WithQueryContext: """A mixin for Queryable objects that allows a context to be added which is always 'anded' with the resulting query condition for any operator""" _query_context = None # pylint: disable=no-member def set_query_context(self, expr: Expr): self._query_context = expr def __eq__(self, other) -> Expr: return self._combine(super().__eq__(other)) def __ne__(self, other) -> Expr: return self._combine(super().__ne__(other)) def __gt__(self, other) -> Expr: return self._combine(super().__gt__(other)) def __ge__(self, other) -> Expr: return self._combine(super().__ge__(other)) def __lt__(self, other) -> Expr: return self._combine(super().__lt__(other)) def __le__(self, other) -> Expr: return self._combine(super().__le__(other)) def in_(self, *possibilities) -> Expr: return self._combine(super().in_(*possibilities)) def nin_(self, *possibilities) -> Expr: return self._combine(super().nin_(*possibilities)) def exists_(self, value: bool = True) -> Expr: return self._combine(super().exists_(value)) def _combine(self, expr: Expr) -> Expr: if self._query_context is None: return expr return And([self._query_context, expr])
[docs]def query_expr(filter: FilterLike) -> dict: # pylint: disable=redefined-builtin """Return a query specification (dict) If a dict is passed is is returned unaltered. Otherwise __qspec__() is called and its value is returned as long as it is a dict. In all other cases, TypeError is raised.""" if isinstance(filter, dict): return filter # Work from the object's type to match method resolution of other magic methods. try: query_repr = filter.__query_expr__() except AttributeError: raise TypeError( "expected dict or object with __query_expr__, not " + str(filter) ) from None if isinstance(query_repr, dict): return query_repr raise TypeError( f"expected {type(filter).__name__}.__query_expr__() to return dict, not {type(query_repr).__name__}" )
def field_name(field) -> str: if isinstance(field, str): return field try: name = field.__field_name__() except AttributeError: raise TypeError( f"expected str or object with __field__name__, not {field}" ) from None if isinstance(name, str): return name raise TypeError( f"expected {type(field).__name__}.__field_name__() to return str, not {type(name).__name__}" )
[docs]def build_expr(item) -> Expr: # noqa: C901 """Expression factory""" # pylint: disable=too-many-branches, too-many-return-statements if isinstance(item, Expr): return item if isinstance(item, dict): if not item: return Empty() if len(item) == 1: return build_expr(tuple(item.items())[0]) # Otherwise, a dictionary is an implicit 'and' return And(list(map(build_expr, item.items()))) if isinstance(item, tuple): if len(item) != 2: raise ValueError(f"Expecting tuple of length two, instead got {item}") first, second = item if first.startswith("$"): # Comparison operators try: oper = COMPARISON_OPERATORS[first] except KeyError: pass else: return oper(second) # Logical operators if first == "$and": return And(list(map(build_expr, second))) if first == "$not": return Not(build_expr(second)) if first == "$nor": return Nor(list(map(build_expr, second))) if first == "$or": return Or(list(map(build_expr, second))) # Element query if first == "$exists": return Exists(second) raise ValueError(f"Unknown operator '{item}'") # Must be a 'match' where the first is the field try: return Comparison(first, build_expr(second)) except (ValueError, TypeError): # pylint: disable=fixme # TODO: See if we can make this check safer # Assume second is a value type return Comparison(first, Eq(second)) try: return item.__expr__() except AttributeError: raise TypeError( "expected dict or object with __expr__, not " + type(item).__name__ ) from None
class Query: __slots__ = "_filter_expressions", "limit", "sort", "skip" def __init__( self, *expr: Expr, limit: int = None, sort: dict = None, skip: int = None ): self._filter_expressions = [] # type: List[Expr] self.extend(expr) self.limit = limit self.sort = sort self.skip = skip def __str__(self) -> str: return str(self.__dict__) def copy(self) -> "Query": return Query( *copy.copy(self._filter_expressions), limit=self.limit, sort=self.sort, skip=self.skip, ) @property def __dict__(self) -> dict: return dict( filter=self.get_filter(), sort=self.sort, limit=self.limit, skip=self.skip ) def append(self, expr: Expr): self._filter_expressions.append(build_expr(expr)) def extend(self, exprs: Iterable[Expr]): for entry in exprs: self.append(entry) def get_filter(self) -> dict: """Get the query filter as a dictionary""" if not self._filter_expressions: return {} return query_expr(And(self._filter_expressions))