Source code for mincepy.process
# -*- coding: utf-8 -*-
import contextlib
import functools
import uuid
import deprecation
from . import base_savable
from . import tracking
from . import version
__all__ = ("Process",)
@deprecation.deprecated(
deprecated_in="0.14.5",
removed_in="0.16",
current_version=version.__version__,
details="Use mincepy.track() instead",
)
def track(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self.running():
return func(self, *args, **kwargs)
return wrapper
[docs]class Process(base_savable.SimpleSavable):
TYPE_ID = uuid.UUID("bcf03171-a1f1-49c7-b890-b7f9d9f9e5a2")
STACK = []
ATTRS = "_name", "_running"
def __init__(self, name: str):
super().__init__()
self._name = name
self._running = 0
def __eq__(self, other):
if not isinstance(other, Process):
return False
return self.name == other.name
@property
def is_running(self):
return self._running != 0
@property
def name(self) -> str:
return self._name
@contextlib.contextmanager
def running(self):
try:
self._running += 1
with tracking.track(self):
yield
finally:
self._running -= 1