Source code for mincepy.process

import contextlib
import functools
from typing import MutableMapping, Any
import uuid

from . import base_savable
from . import utils

__all__ = 'Process', 'track'


def track(func):

    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        with self.running():
            return func(self, *args, **kwargs)

    return wrapper


[docs]class Process(base_savable.SimpleSavable): TYPE_ID = uuid.UUID('bcf03171-a1f1-49c7-b890-b7f9d9f9e5a2') STACK = [] ATTRS = '_name', '_running' @classmethod def current_process(cls): if not cls.STACK: return None return cls.STACK[-1] def __init__(self, name: str): super(Process, self).__init__() self._name = name self._running = 0 def __eq__(self, other): if not isinstance(other, Process): return False return self.name == other.name @property def is_running(self): return self._running != 0 @property def name(self) -> str: return self._name @contextlib.contextmanager def running(self): self.STACK.append(self) self._running += 1 try: yield finally: if self.STACK[-1] != self: raise RuntimeError("Someone has corrupted the process stack!\n" "Expected to find '{}' on top but found:{}".format( self, self.STACK)) self._running -= 1 self.STACK.pop()
class CreatorsRegistry: """Global registry of the creator of each object""" _creators = utils.WeakObjectIdDict() # type: MutableMapping[Any, Any] def __init__(self): raise RuntimeError("Cannot be instantiated") @classmethod def created(cls, obj): """Called when an object is created. The historian tracks the creator for saving when the object is saved :param obj: the object that was created :type obj: mincepy.SavableObject """ creator = Process.current_process() if creator is not None: cls._creators[obj] = creator @classmethod def get_creator(cls, obj): """ Get the creator of the passed object :param obj: the object to get the creator of :type obj: mincepy.SavableObject :return: Optional[mincepy.SavableObject] """ return cls._creators.get(obj, None) @classmethod def set_creator(cls, obj, creator): """ Set the creator of an object. This should no typically be called by user code. :param obj: the object to set the creator of :type obj: mincepy.SavableObject :param creator: the creating object :type creator: mincepy.SavableObject """ cls._creators[obj] = creator