Source code for mincepy.depositors

"""This module contains various strategies for loading, saving and migrating objects in the archive
"""

from abc import ABCMeta, abstractmethod
import logging
from typing import Optional, MutableMapping, Any, Iterable, Sequence

from pytray import tree

import mincepy  # pylint: disable=unused-import
from . import archives
from . import exceptions
from . import operations
from . import records

__all__ = 'Saver', 'Loader', 'SnapshotLoader', 'LiveDepositor'

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


class Base(metaclass=ABCMeta):
    """Common base for loader and saver"""

    def __init__(self, historian):
        self._historian = historian  # type: mincepy.Historian

    def get_archive(self) -> archives.Archive:
        return self._historian.archive

    def get_historian(self):
        """
        Get the owning historian.

        :return: the historian
        :rtype: mincepy.Historian
        """
        return self._historian


[docs]class Saver(Base, metaclass=ABCMeta): """A depositor that knows how to save records to the archive"""
[docs] @abstractmethod def ref(self, obj) -> records.SnapshotId: """Get a persistent reference for the given object"""
[docs] def encode(self, obj, schema=None, path=()): """Encode a type for archiving""" historian = self.get_historian() if historian.is_primitive(obj): # Deal with the special containers by encoding their values if need be return tree.transform(self.encode, obj, path, schema=schema) # Store by value helper = historian.get_helper(type(obj), auto_register=True) save_state = helper.save_instance_state(obj, self) if not historian.is_primitive(save_state): raise RuntimeError("Saved state must be one of the primitive types") schema_entry = [path, helper.TYPE_ID] version = helper.get_version() if version is not None: schema_entry.append(version) if schema is not None: schema.append(schema_entry) return self.encode(save_state, schema, path)
[docs] def save_state(self, obj): """Save the state of an object and return the encoded state ready to be archived in a record""" schema = [] saved_state = self.encode(obj, schema) return {records.STATE: saved_state, records.STATE_TYPES: schema}
[docs]class Loader(Base, metaclass=ABCMeta): """A loader that knows how to load objects from the archive"""
[docs] def decode(self, encoded, schema: records.StateSchema = None, path=(), created_callback=None, updates=None): """Given the encoded state and an optional schema that defines the type of the encoded objects this method will decode the saved state and load the object.""" try: entry = schema[path] except KeyError: return self._recursive_unpack(encoded, schema, path, created_callback) else: saved_state = encoded helper = self.get_historian().get_helper(entry.type_id) if helper.IMMUTABLE: saved_state = self._recursive_unpack(encoded, schema, path, created_callback) new_obj = helper.new(saved_state) if new_obj is None: raise RuntimeError("Helper '{}' failed to create a class given state '{}'".format( helper.__class__, saved_state)) if created_callback is not None: created_callback(path, new_obj) if not helper.IMMUTABLE: saved_state = self._recursive_unpack(encoded, schema, path, created_callback, updates) updated = helper.ensure_up_to_date(saved_state, entry.version, self) if updated is not None: # Use the current version of the record saved_state = updated if updates is not None: updates[path] = updated helper.load_instance_state(new_obj, saved_state, self) return new_obj
def _recursive_unpack(self, encoded_saved_state, schema: records.StateSchema = None, path=(), created_callback=None, updates=None): """Unpack a saved state expanding any contained objects""" return tree.transform(self.decode, encoded_saved_state, path, schema=schema, created_callback=created_callback, updates=updates)
[docs]class LiveDepositor(Saver, Loader): """Depositor with strategy that all objects that get referenced should be saved"""
[docs] def ref(self, obj) -> Optional[records.SnapshotId]: if obj is None: return None try: # Try getting it from the transaction as there may be one from an in-progress save. # We can't use historian.get_ref here because we _only_ want one that's currently # being saved or we should try saving it as below to ensure it's up to date return self._historian.current_transaction().get_reference_for_live_object(obj) except exceptions.NotFound: # Then we have to save it and get the resulting reference return self._historian._save_object(obj, self).snapshot_id
def load(self, reference: records.SnapshotId): try: return self._historian.get_obj(reference.obj_id) except exceptions.NotFound: return self._historian._load_object(reference.obj_id, self)
[docs] def load_from_record(self, record: records.DataRecord): """Load an object from a record""" with self._historian.transaction() as trans: def created(path, new_obj): """Called each time an object is created whilst decoding""" # For the root object, put it into the transaction as a live object if not path: trans.insert_live_object(new_obj, record) updates = {} loaded = self.decode(record.state, record.get_state_schema(), created_callback=created, updates=updates) if updates: logger.warning( "Object snapshot '%s' is at an older version that your current codebase. It " "can be migrated by using `mince migrate` from the command line. If this " "object is saved the new entry will use the new version.", record.snapshot_id) return loaded
[docs] def update_from_record(self, obj, record: records.DataRecord) -> bool: """Do an in-place update of a object from a record""" historian = self.get_historian() helper = historian.get_helper(type(obj)) with historian.transaction() as trans: # Make sure the record is in the transaction with the object trans.insert_live_object(obj, record) saved_state = self._recursive_unpack(record.state, record.get_state_schema()) helper.load_instance_state(obj, saved_state, self) return True
[docs] def save_from_builder(self, obj, builder: records.DataRecordBuilder): """Save a live object""" from . import process assert builder.snapshot_hash is not None, \ "The snapshot hash must be set on the builder before saving" historian = self.get_historian() with historian.transaction() as trans: # Insert the object into the transaction so others can refer to it ref = records.SnapshotId(builder.obj_id, builder.version) trans.insert_live_object_reference(ref, obj) # Deal with a possible object creator if builder.version == 0: creator = process.CreatorsRegistry.get_creator(obj) if creator is not None: # Found one builder.extras[records.ExtraKeys.CREATED_BY] = self.ref(creator).obj_id # Now ask the object to save itself and create the record builder.update(self.save_state(obj)) record = builder.build() # Insert the record into the transaction trans.insert_live_object(obj, record) trans.stage(operations.Insert(record)) # Stage it for being saved return record
[docs]class SnapshotLoader(Loader): """Responsible for loading snapshots. This object should not be reused and only one external call to `load` should be made. This is because it keeps an internal cache.""" def __init__(self, historian): super().__init__(historian) self._snapshots = {} # type: MutableMapping[records.SnapshotId, Any]
[docs] def load(self, sid: records.SnapshotId) -> Any: """Load an object from its snapshot id""" if not isinstance(sid, records.SnapshotId): raise TypeError(sid) try: snapshot = self._snapshots[sid] except KeyError: record = self.get_archive().load(sid) if record.is_deleted_record(): snapshot = None else: snapshot = self.load_from_record(record) # Cache it self._snapshots[sid] = snapshot return snapshot
def load_from_record(self, record: records.DataRecord) -> Any: with self._historian.transaction() as trans: updates = {} obj = self.decode(record.state, record.get_state_schema(), updates=updates) trans.insert_snapshot(obj, record.snapshot_id) if updates: logger.warning( "Object snapshot '%s' is at an older version that your current codebase. It " "can be migrated by using `mince migrate` from the command line.", record.snapshot_id) return obj
class Migrator(Saver, Loader): def ref(self, obj) -> records.SnapshotId: try: return self.get_historian().get_snapshot_id(obj) except exceptions.NotFound: pass # Ok, try the current transaction trans = self.get_historian().current_transaction() if trans is not None: for sid, snapshot in trans.snapshots.items(): if obj is snapshot: return sid # Ok, it's a brand new object that's never been saved, so save it self._historian.save_one(obj) return self.get_historian().get_snapshot_id(obj) def migrate_records(self, to_migrate: Iterable[records.DataRecord]) -> Sequence[records.DataRecord]: """Migrate multiple records. This call will return an iterable of those that were migrated """ migrated = [] with self._historian.transaction() as trans: for record in to_migrate: updates = {} obj = self.decode(record.state, record.get_state_schema(), updates=updates) if updates: self._migrate_record(record, obj, trans) migrated.append(record) return migrated def _migrate_record(self, record, new_obj, trans): """Given the current record and the corresponding instance this will save an updated state to the dictionary by re-saving the object. The current transaction must be supplied.""" new_schema = [] new_state = self.encode(new_obj, new_schema) trans.stage( operations.Update(record.snapshot_id, { records.STATE: new_state, records.STATE_TYPES: new_schema })) logger.info("Snapshot %s has been migrated to the latest version", record.snapshot_id)