Source code for mincepy.files
# -*- coding: utf-8 -*-
import pathlib
import shutil
import tempfile
from typing import Optional, BinaryIO, TextIO, Union
from . import type_ids
from . import base_savable
from . import fields
__all__ = "File", "BaseFile"
[docs]class File(base_savable.SimpleSavable):
"""A mincePy file object. These should not be instantiated directly but using Historian.create_file()"""
TYPE_ID = type_ids.FILE_TYPE_ID
READ_SIZE = 256 # The number of bytes to read at a time
def __init__(self, file_store, filename: str = None, encoding=None):
super().__init__()
self._file_store = file_store
self._filename = filename
self._encoding = encoding
self._file_id = None
self._buffer_file = _create_buffer_file()
@fields.field("_filename")
def filename(self) -> Optional[str]:
return self._filename
@fields.field("_encoding")
def encoding(self) -> Optional[str]:
return self._encoding
@fields.field("_file_id")
def file_id(self):
return self._file_id
[docs] def open(self, mode="r", **kwargs) -> Union[BinaryIO, TextIO]:
"""Open returning a file like object that supports close() and read()"""
self._ensure_buffer()
if "b" not in mode:
kwargs.setdefault("encoding", self.encoding)
return open(
self._buffer_file, mode, **kwargs
) # pylint: disable=unspecified-encoding
[docs] def from_disk(self, path):
"""Copy the contents of a disk file to this file"""
with open(str(path), "r", encoding=self.encoding) as disk_file:
with self.open("w") as this:
shutil.copyfileobj(disk_file, this)
[docs] def to_disk(self, path: [str, pathlib.Path]):
"""Copy the contents of this file to disk.
:param path: the path can be either a folder in which case the file contents are written to
`path / self.filename` or path can be a full file path in which case that will be used.
"""
file_path = pathlib.Path(str(path))
if file_path.is_dir():
file_path /= self.filename
with open(str(file_path), "w", encoding=self._encoding) as disk_file:
with self.open("r") as this:
shutil.copyfileobj(this, disk_file)
def write_text(self, text: str, encoding=None):
encoding = encoding or self._encoding
with self.open("w", encoding=encoding) as fileobj:
fileobj.write(text)
[docs] def read_text(self, encoding=None) -> str:
"""Read the contents of the file as text.
This function is named as to mirror pathlib.Path"""
encoding = encoding or self._encoding
with self.open("r", encoding=encoding) as fileobj:
return fileobj.read()
[docs] def save_instance_state(self, saver):
filename = self.filename or ""
with open(self._buffer_file, "rb") as fstream:
self._file_id = self._file_store.upload_from_stream(filename, fstream)
return super().save_instance_state(saver)
[docs] def load_instance_state(self, saved_state, loader):
super().load_instance_state(saved_state, loader)
self._file_store = loader.get_archive().file_store
# Don't copy the file over now, do it lazily when the file is first opened
self._buffer_file = None
def _ensure_buffer(self):
if self._buffer_file is None:
if self._file_id is not None:
self._update_buffer()
else:
_create_buffer_file()
def _update_buffer(self):
self._buffer_file = _create_buffer_file()
with open(self._buffer_file, "wb") as fstream:
self._file_store.download_to_stream(self._file_id, fstream)
def __str__(self):
contents = [str(self._filename)]
if self._encoding is not None:
contents.append(f"({self._encoding})")
return " ".join(contents)
def __eq__(self, other) -> bool:
"""Compare the contents of two files
If both files do not exist they are considered equal.
"""
if (
not isinstance(other, File) or self.filename != other.filename
): # pylint: disable=comparison-with-callable
return False
try:
with self.open() as my_file:
try:
with other.open() as other_file:
while True:
my_line = my_file.readline(self.READ_SIZE)
other_line = other_file.readline(self.READ_SIZE)
if my_line != other_line:
return False
if my_line == "" and other_line == "":
return True
except FileNotFoundError:
return False
except FileNotFoundError:
# Our file doesn't exist, make sure the other doesn't either
try:
with other.open():
return False
except FileNotFoundError:
return True
[docs] def yield_hashables(self, hasher):
"""Hash the contents of the file"""
try:
with self.open("rb") as opened:
while True:
line = opened.read(self.READ_SIZE)
if line == b"":
return
yield line
except FileNotFoundError:
yield from hasher.yield_hashables(None)
def _create_buffer_file():
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_path = tmp_file.name
return tmp_path
BaseFile = File # Here just for legacy reasons. Deprecate in 1.0