import abc
from typing import Generic, TypeVar, NamedTuple, Sequence, Union, Mapping, Iterable, Dict, \
Iterator, Any, Type, Optional # pylint: disable=unused-import
from . import qops as q
from .records import DataRecord, SnapshotId
from . import operations
__all__ = 'Archive', 'BaseArchive', 'ASCENDING', 'DESCENDING'
IdT = TypeVar('IdT') # The archive ID type
# Sort options
ASCENDING = 1
DESCENDING = -1
[docs]class Archive(Generic[IdT], metaclass=abc.ABCMeta):
"""An archive provides the persistent storage for the historian. It is responsible for storing,
searching and loading data records and their metadata."""
# pylint: disable=too-many-public-methods
RefEdge = NamedTuple('RefEdge', [('source', SnapshotId[IdT]), ('target', SnapshotId[IdT])])
RefGraph = Iterable[RefEdge]
MetaEntry = NamedTuple('MetaEntry', [('obj_id', IdT), ['meta', dict]])
[docs] @classmethod
def get_types(cls) -> Sequence:
"""This method allows the archive to return either types or type helper that the historian
should support. A common example is the type helper for the object id type"""
return tuple()
[docs] @classmethod
@abc.abstractmethod
def get_id_type(cls) -> Type[IdT]:
"""Get the type used as an ID by this archive"""
[docs] @abc.abstractmethod
def create_archive_id(self) -> IdT:
"""Create a new archive id"""
[docs] @abc.abstractmethod
def construct_archive_id(self, value) -> IdT:
"""If it's possible, construct an archive value from the passed value.
This is useful as a convenience to the user if, say, the archive id can be
constructed from a string. Raise TypeError or ValueError if this is not possible
for the given value.
"""
[docs] @abc.abstractmethod
def create_file(self, filename: str = None, encoding: str = None):
"""Create a new file object specific for this archive type"""
[docs] @abc.abstractmethod
def save(self, record: DataRecord):
"""Save a data record to the archive"""
[docs] @abc.abstractmethod
def save_many(self, records: Sequence[DataRecord]):
"""Save many data records to the archive"""
[docs] @abc.abstractmethod
def bulk_write(self, ops: Sequence[operations.Operation]):
"""Made a collection of write operations to the database"""
# region Metadata
# endregion
[docs] @abc.abstractmethod
def load(self, reference: SnapshotId[IdT]) -> DataRecord:
"""Load a snapshot of an object with the given reference"""
[docs] @abc.abstractmethod
def history(self, obj_id: IdT, idx_or_slice) -> [DataRecord, Sequence[DataRecord]]:
"""Load the snapshot records for a particular object, can return a single or multiple
records"""
[docs] @abc.abstractmethod
def get_snapshot_refs(self, obj_id: IdT) -> Sequence[SnapshotId[IdT]]:
"""Returns a list of time ordered snapshot references"""
# pylint: disable=too-many-arguments
[docs] @abc.abstractmethod
def find(self,
obj_id: Union[IdT, Iterable[IdT], Dict] = None,
type_id=None,
created_by=None,
copied_from=None,
version=None,
state=None,
state_types=None,
snapshot_hash=None,
meta=None,
limit=0,
sort=None,
skip=0) -> Iterator[DataRecord]:
"""Find records matching the given criteria
:param type_id: the type id to look for
:param created_by: find records with the given type id
:param copied_from: find records copied from the record with the given id
:param version: restrict the search to this version, -1 for latest
:param state: find objects with this state filter
:param state_types: file objects with this state types filter
:param snapshot_hash: find objects with this snapshot hash
:param meta: find objects with this meta filter
:param limit: limit the results to this many records
:param obj_id: an optional restriction on the object ids to search. This ben be either:
1. a single object id
2. an iterable of object ids in which is treated as {'$in': list(obj_ids)}
3. a general query filter to be applied to the object ids
:param sort: sort the results by the given criteria
:param skip: skip the this many entries
"""
[docs] @abc.abstractmethod
def count(self,
obj_id=None,
type_id=None,
created_by=None,
copied_from=None,
version=-1,
state=None,
snapshot_hash=None,
meta=None,
limit=0):
"""Count the number of entries that match the given query"""
[docs] @abc.abstractmethod
def get_reference_graph(self, sids: Sequence[SnapshotId[IdT]]) -> 'Sequence[Archive.RefGraph]':
"""Given one or more object ids the archive will supply the corresponding reference graph(s)
"""
[docs]class BaseArchive(Archive[IdT]):
ID_TYPE = None # type: Type[IdT]
[docs] @classmethod
def get_id_type(cls) -> Type[IdT]:
assert cls.ID_TYPE, "The ID type has not been set on this archive"
return cls.ID_TYPE
[docs] def save(self, record: DataRecord):
return self.bulk_write([operations.Insert(record)])
[docs] def save_many(self, records: Sequence[DataRecord]):
"""
This will save records one by one but subclass may want to override this behaviour if
they can save multiple records at once.
"""
self.bulk_write([operations.Insert(record) for record in records])
[docs] def history(self, obj_id: IdT, idx_or_slice) -> [DataRecord, Sequence[DataRecord]]:
refs = self.get_snapshot_refs(obj_id)[idx_or_slice]
if len(refs) > 1:
return [self.load(ref) for ref in refs]
# Single one
return self.load(refs[0])
[docs] def construct_archive_id(self, value) -> IdT: # pylint: disable=no-self-use
raise TypeError("Not possible to construct an archive id from '{}'".format(type(value)))
def scalar_query_spec(specifier: Union[Mapping, Iterable[Any], Any]) -> \
Union[Any, Dict]:
"""Convenience function to create a query specifier for a given item. There are three
possibilities:
1. The item is a mapping in which case it is returned as is.
2. The item is an iterable (but not a mapping) in which case it is interpreted to mean:
{'$in': list(iterable)}
3. it is a raw item item in which case it is matched directly
"""
if isinstance(specifier, Mapping):
return specifier
if isinstance(specifier, Iterable):
return q.in_(*specifier)
return specifier