Source code for mincepy.hist.snapshots

# -*- coding: utf-8 -*-
import logging
from typing import Callable

from mincepy import archives
from mincepy import frontend
from mincepy import operations
import mincepy.records as recordsm
from mincepy import result_types

__all__ = ("SnapshotsCollection",)

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name

[docs]class SnapshotsCollection(frontend.ObjectCollection): def __init__(self, historian, archive_collection: archives.Collection): super().__init__( historian, archive_collection, record_factory=lambda record_dict: SnapshotLoadableRecord( record_dict, historian.load_snapshot_from_record ), obj_loader=historian.load_snapshot_from_record, )
[docs] def purge(self, deleted=True, dry_run=True) -> result_types.PurgeResult: """Function to delete various unused objects from the database. This function cannot and will never delete data the is currently in use.""" if deleted: # First find all the object ids of those that have been deleted # pylint: disable=protected-access res = self.records.find( recordsm.DataRecord.state == recordsm.DELETED )._project(recordsm.OBJ_ID) obj_ids = [entry[recordsm.OBJ_ID] for entry in res] # DB HIT logging.debug("Found %i objects that have been deleted", len(obj_ids)) # Need the object id and version to create the snapshot ids # pylint: disable=protected-access res = self.records.find(recordsm.DataRecord.obj_id.in_(*obj_ids))._project( recordsm.OBJ_ID, recordsm.VERSION ) snapshot_ids = [recordsm.SnapshotId(**entry) for entry in res] # DB HIT "Found %i objects with %i snapshots that are deleted, removing.", len(obj_ids), len(snapshot_ids), ) if snapshot_ids and not dry_run: # Commit the changes self._historian.archive.bulk_write( [operations.Delete(sid) for sid in snapshot_ids] )"Deleted %i snapshots", len(snapshot_ids)) return result_types.PurgeResult(set(snapshot_ids))
class SnapshotLoadableRecord(recordsm.DataRecord): __slots__ = () def __new__( cls, record_dict: dict, snapshot_loader: Callable[[recordsm.DataRecord], object] ): loadable = super().__new__(cls, **record_dict) loadable._snapshot_loader = snapshot_loader return loadable def load(self) -> object: return self._snapshot_loader(self)