Source code for artisan._artifacts

'''
Functionality for defining persistent, parameterizable computed asset types.

Exported definitions:
    Artifact (abstract `Target` subclass): A typed view into a directory.
        `Artifact` is intended to be subclassed by application authors.
    DynamicArtifact (`Artifact` subclass): An artifact with dynamic fields.
    ProxyArtifactField (class): An artifact field that does not yet exist.
    build (function): Build a target from a specification.
    recover (function): Recover an existing artifact.

Internal definitions:
    active_builder (context variable): The default directory for artifact
        creation and search.
    active_root (context variable): The function called to write files into
       artifact directories.
'''

from __future__ import annotations

import json, re, shutil
from contextvars import ContextVar
from datetime import datetime
from functools import reduce
from itertools import count
from os import PathLike
from os.path import lexists
from pathlib import Path
from tempfile import TemporaryDirectory
from time import sleep
from typing import (
    TYPE_CHECKING, Any, Callable, ClassVar, Iterable,
    Iterator, List, Literal, MutableMapping, Optional,
    Type, TypeVar, Union, cast, final)

import numpy as np

from ._cbor_io import read_cbor_file, write_object_as_cbor
from ._fs_index import DirIndex
from ._misc_io import (
    read_json_file, read_numpy_file, read_opaque_file,
    read_text_file, write_path)
from ._namespaces import Namespace, dictify, namespacify
from ._targets import Target, TargetType, active_scope

__all__ = [
    'Artifact', 'DynamicArtifact', 'ProxyArtifactField',
    'active_builder', 'active_root', 'build', 'recover']

if not TYPE_CHECKING:
    # Redefine `MutableMapping` to make it
    # compatible with artifact types.
    import collections.abc
    class MutableMapping(collections.abc.MutableMapping):
        def __class_getitem__(cls, key: type) -> type:
            return cls



#-- Type variables and aliases -------------------------------------------------

T = TypeVar('T')
SomeTarget = TypeVar('SomeTarget', bound=Target)
SomeArtifact = TypeVar('SomeArtifact', bound='Artifact')

Reader = Callable[[Path], Any]
Writer = Callable[[Path, Any], str]
AccessMode = Literal['read-sync', 'read-async', 'write']



#-- Context-local state --------------------------------------------------------

def default_builder(artifact: Artifact, spec: object) -> None:
    '''
    Call `artifact.__init__` in "write" mode, logging "Start", "Success", and/or
    "Failure" events to `{artifact._path_}/_meta_.json`.
    '''
    prev_mode = artifact._mode_
    artifact._mode_ = 'write'
    log(artifact, 'Start')
    try:
        artifact.__init__(spec) # type: ignore
        log(artifact, 'Success')
    except Exception as e:
        log(artifact, 'Failure', message=str(e))
        raise e
    finally:
        artifact._mode_ = prev_mode


active_root = ContextVar('artisan:root', default=Path('.')); \
    '''
    The default directory for artifact creation, and the directory that will be
    searched for matches when an artifact is instantiated from a specification.
    '''


active_builder = ContextVar('artisan:builder', default=default_builder); \
    '''
    The function called to write files into artifact directories. It accepts two
    arguments, the artifact to construct and its specification.
    '''



#-- `ArtifactType` -------------------------------------------------------------

class ArtifactType(TargetType):
    '''
    `Artifact`'s metaclass.
    '''
    def __call__(cls: Type[T], *args: object, **kwargs: object) -> T:
        '''
        Return a new instance, without calling `__init__`.
        '''
        cls, args = cls._refine_call_args(args) # type: ignore
        return cls.__new__(cls, *args, **kwargs) # type: ignore

    def __matmul__(cls: Type[T], path: Union[PathLike, str]) -> T:
        '''
        Return an artifact corresponding to the directory at the specified path.
        '''
        return recover(cls, path) # type: ignore



#-- `Artifact` -----------------------------------------------------------------

class Artifact(Target, metaclass=ArtifactType):
    '''
    A typed view into a directory.

    **Instantiation**

    When an artifact is instantiated, Artisan will search for a directory with a
    matching `_meta_.json` file in the active context's root directory. The
    search is recursive, but when a directory with a `_meta_.json` file is
    found, its subdirectories will not be searched. If a matching directory does
    not exist, a new directory will be created, and the active context's
    artifact builder will be called to build the artifact there. If `spec` has a
    `_path_` field, the artifact will be located at that path. The "@" operator,
    as in `ArtifactType @ path`, can be used to load an existing artifact
    without requiring it to match a specification. In the default context, the
    root directory is the current working directory, and the artifact builder
    calls `__init__` and logs metadata to `_meta_.json`.

    **Reading and writing files**

    Reading from and writing to an artifact's attributes corresponds to reading
    from and writing to files in the corresponding directory. Support for new
    file types can be added by overriding an artifact type's list of readers
    (`_readers_`) and/or its list of writers (`_writers_`). Readers should be
    functions that accept a path and return an object representing the data
    stored at that path. Writers should accept an extensionless path and a data
    object, write the data to a version of the path with an appropriate
    extension, and return that extension. To support concurrent reading and
    writing, files generated by writers are not moved into the artifact's
    directory until after the writer returns.

    When reading from or writing to files, the first reader/writer not to raise
    an exception when called will be used. For performance, Artisan may skip
    writers whose data argument type does not match the object being stored.
    Similarly, Artisan may skip readers whose path argument type is annotated
    with an incompatible extension requirement, *e.g.* `Annotated[Path, '.txt']`
    or `Annotated[Path, '.jpg', '.jpeg']`. The `Annotated` type constructor can
    be imported from the `typing` module in Python 3.9+ and the
    `typing_extensions` module in earlier versions.

    **Attribute-access modes**

    Artifacts can be instantiated in "read-sync", "read-async", or "write" mode.
    In "read-sync" mode, attribute accesses will only return after the artifact
    has finished building. In "read-async" mode, attribute accesses will return
    as soon as a corresponding file or directory exists. In "write" mode,
    attribute accesses will return immediately, but a `ProxyArtifactField` will
    be returned if no corresponding file or directory is present. Artifacts are
    instantiated in "read-sync" mode by default, but if `spec` has a `_mode_`
    attribute, that mode will be used. The default builder always executes
    artifacts' `__init__` methods in "write" mode (an other builders should as
    well), so it is generally only necessary to specify `_mode_` when
    "read-async" behavior is desired.

    Arguments:
        spec (Artifact.Spec): The artifact's specification.

    :var _readers_:
        The deserialization functions artifacts of this type will
        try to use when their attributes are being accessed.
    :var _writers_:
        The serialization functions artifacts of this type will
        try to use when their attributes are being assigned to.
    :var _path_:
        The artifact's path on the filesystem.
    :var _mode_:
        The artifact's attribute-access mode.

    :vartype _writers_: ClassVar[List[Callable]]
    :vartype _readers_: ClassVar[List[Callable]]
    :vartype _path_: Path
    :vartype _mode_: Literal['read-sync', 'read-async', 'write']
    '''
    _readers_: ClassVar[List[Reader]] = [
        read_cbor_file,
        read_text_file,
        read_json_file,
        read_numpy_file,
        read_opaque_file]

    _writers_: ClassVar[List[Writer]] = [
        write_path,
        write_object_as_cbor]

    _path_: Path
    _mode_: AccessMode
    _index: DirIndex

    def __new__(cls: Type[SomeArtifact], spec: object) -> SomeArtifact:
        '''
        Find or build an artifact with the given specification.
        '''
        # Determine the artifact's path.
        match_path = find_match(cls, spec)
        path = match_path or make_stub(cls, spec)

        # Create an instance.
        instance = Target.__new__(cls, spec)
        instance.__dict__['_path_'] = path
        instance.__dict__['_mode_'] = getattr(spec, '_mode_', 'read-sync')
        instance.__dict__['_index'] = DirIndex(path)

        # Invoke the builder if the artifact is new.
        if match_path is None:
            builder = active_builder.get()
            builder(instance, spec)

        # Return the instance, with building having been
        # initiated, but not necessarily having finished.
        return instance

[docs] def __dir__(self) -> List[str]: ''' Return the names of this artifact's attributes. ''' return (sorted(self._index.get_entry_names()) + cast(list, super().__dir__()))
[docs] def __getattr__(self, key: str) -> Any: ''' Return the data stored at `{self._path_}/{key}{inferred_extension}`. ''' if self._mode_ == 'read-sync': while self._is_building(): sleep(0.001) path = self._index.get_entry_path(key) if path is None: raise AttributeError(f"Attribute not found: '{key}'") elif self._mode_ == 'read-async': path = self._index.get_entry_path(key) while path is None and self._is_building(): sleep(0.001) path = self._index.get_entry_path(key) if path is None: raise AttributeError(f"Attribute not found: '{key}'") else: path = self._index.get_entry_path(key) if path is None: return ProxyArtifactField(self, key) if path.is_dir(): return recover(Artifact, path, self._mode_) for reader in self._readers_: try: return reader(path) except ValueError: pass raise OSError(f'Unsupported content type at "{path}"')
[docs] def __setattr__(self, key: str, value: object) -> None: ''' Write data to `{self._path_}/{key}{inferred_extension}`. The extension is determined based on the type of data stored. For performance, other entries with matching keys are not deleted. Use `__delattr__` to delete those entries. ''' if self._mode_ in ('read-sync', 'read-async') or key in self.__dict__: super().__setattr__(key, value) return if isinstance(value, Artifact): (self._path_ / key).symlink_to( value._path_, target_is_directory=True) return with TemporaryDirectory() as temp_root: temp_path = Path(temp_root, 'file') for writer in self._writers_: try: suffix = writer(temp_path, value) dst = (self._path_ / key).with_suffix(suffix) temp_path.replace(dst) self._index.set_entry_path(key, dst) return except TypeError: pass raise OSError(f'Unsupported content type: {type(value)}')
[docs] def __delattr__(self, key: str) -> None: ''' Delete all entries in `self._path_` with the given stem. ''' for path in list(self._path_.iterdir()): if path.stem == key: if path.is_dir(): shutil.rmtree(path) else: path.unlink()
[docs] def __fspath__(self) -> str: ''' Return this artifact's path, as a string. ''' return str(self._path_)
[docs] def __truediv__(self, entry_name: str) -> Path: ''' Return `self._path_ / entry_name`. ''' return self._path_ / entry_name
def _is_building(self) -> bool: ''' Return whether this artifact is currently being built. ''' meta = self._index.get_meta() events = meta['events'] if isinstance(meta, dict) else [] started = any(e['type'] == 'Start' for e in events) finished = any(e['type'] in ('Success', 'Failure') for e in events) return started and not finished #-- `DynamicArtifact` ---------------------------------------------------------- @final class DynamicArtifact(Artifact, MutableMapping[str, T]): ''' An artifact with dynamically named fields. Item access, assignment, deletion, and iteration can be used in place of attribute access, assignment, deletion, and iteration '''
[docs] def __len__(self) -> int: return len(list(self._index.get_entry_names()))
[docs] def __iter__(self) -> Iterator[str]: return (name for name in sorted(self._index.get_entry_names()) if name[0] not in '._')
[docs] def __contains__(self, key: object) -> bool: return ((self._index.get_entry_path(key) is not None) if isinstance(key, str) else False)
[docs] def __getitem__(self, key: str) -> T: return self.__getattr__(key)
[docs] def __setitem__(self, key: str, val: object) -> None: self.__setattr__(key, val)
[docs] def __delitem__(self, key: str) -> None: self.__delattr__(key)
#-- `ProxyArtifactField` ------------------------------------------------------- @final class ProxyArtifactField: ''' An artifact field that does not yet exist. Corresponding files and/or directories will be created when it is written to. ''' _root: Artifact _keys: List[str] def __init__(self, root: Artifact, *keys: str) -> None: self.__dict__['_root'] = root self.__dict__['_keys'] = keys
[docs] def __getattr__(self, key: str) -> ProxyArtifactField: if lexists(self._root._path_.joinpath(*self._keys)): return getattr(reduce(getattr, self._keys, self._root), key) else: return ProxyArtifactField(self._root, *self._keys, key)
[docs] def __setattr__(self, key: str, value: object) -> None: Path(self._root._path_, *self._keys).mkdir(parents=True, exist_ok=True) setattr(reduce(getattr, self._keys, self._root), key, value)
[docs] def __delattr__(self, key: str) -> None: delattr(reduce(getattr, self._keys, self._root), key)
def __getitem__(self, key: str) -> Any: return self.__getattr__(key) def __setitem__(self, key: str, val: object) -> None: self.__setattr__(key, val) def __delitem__(self, key: str) -> None: self.__delattr__(key)
[docs] def append(self, item: object) -> None: if isinstance(item, np.ndarray): self.extend(item[None]) else: self.extend([item])
[docs] def extend(self, items: Iterable[object]) -> None: *wrapper_keys, field_key = self._keys Path(self._root._path_, *wrapper_keys).mkdir(parents=True, exist_ok=True) wrapper = reduce(getattr, wrapper_keys, self._root) field = getattr(wrapper, field_key) if isinstance(field, ProxyArtifactField): setattr(wrapper, field_key, items) else: field.extend(items)
#-- Artifact/target-creation functions ----------------------------------------- def build(cls: Type[SomeTarget], spec: object, *args: object, **kwargs: object) -> SomeTarget: ''' Build a target from a specification. To support using JSON-encodable objects as specifications, mappings in `spec` are converted to namespaces and artifact-root-relative path strings (strings starting with "@/") are converted to artifacts if they point to an existing directory, and `Path` objects otherwise. `args` and `kwargs` are forwarded to the target's constructor. ''' return cls(namespacify(spec, decode_path), *args, **kwargs) # type: ignore def recover(cls: Type[SomeArtifact], path: Union[PathLike, str], mode: str = 'read-sync') -> SomeArtifact: ''' Recover an existing artifact. `mode` must be "read-sync", "read-async", or "write". ''' # Expand "~" and "@" in the path. path = resolve(path) # Infer the class based on `_meta_.json`. refined_cls: Type[SomeArtifact] = DynamicArtifact # type: ignore if cls is not DynamicArtifact: try: meta = DirIndex(path).get_meta() cls_spec = meta['spec']['type'] # type: ignore refined_cls = active_scope.get()[cls_spec] except (TypeError, KeyError): pass # Check that the inferred class is valid. if not issubclass(refined_cls, cls): raise ValueError(f'`{refined_cls}` is not a subclass of `{cls}`.') # Construct and return an artifact. artifact = Target.__new__(refined_cls) artifact.__dict__['_path_'] = path artifact.__dict__['_mode_'] = mode artifact.__dict__['_index'] = DirIndex(path) return artifact #-- Support functions ---------------------------------------------------------- def find_match(cls: Type[Artifact], spec: object) -> Optional[Path]: ''' Return the path to an artifact matching the given specification, or `None` if no such artifact exists. ''' root = active_root.get() spec_path = getattr(spec, '_path_', None) spec_dict = dictify( {'type': cls, **vars(spec)}, path_encoder = encode_path, type_encoder = get_type_name) candidates = ( [DirIndex(resolve(spec_path))] if spec_path is not None else DirIndex(root).get_artifacts()) for dir_index in candidates: meta = dir_index.get_meta() if (isinstance(meta, dict) and meta['spec'] == spec_dict and all(e['type'] != 'Failure' for e in meta['events'])): return dir_index.path return None def make_stub(cls: Type[Artifact], spec: object) -> Path: ''' Create a new directory for an artifact with the given type and specification, and initialize its `_meta_.json` file. ''' root = active_root.get() spec_path = getattr(spec, '_path_', None) spec_dict = dictify({'type': cls, **vars(spec)}, encode_path, get_type_name) generated_paths = (root / f'{spec_dict["type"]}_{i:04x}' for i in count()) candidates = [resolve(spec_path)] if spec_path else generated_paths for path in candidates: try: path.mkdir(parents=True) meta = {'spec': spec_dict, 'events': []} write_json_atomically(path / '_meta_.json', meta) return path except FileExistsError: continue else: raise FileExistsError(f'Incompatible files exist at `{path}`.') def resolve(path: Union[PathLike, str]) -> Path: ''' Return an absolute path, dereferencing "~" (the home directory) and "@" (the root artifact directory). ''' root = active_root.get() path = Path(re.sub('^@', str(root), str(Path(path)))) path = path.expanduser().resolve() return path def get_type_name(type_: type) -> str: ''' Return the given type's name in the active scope. If the given type has multiple name, the lexicographically first name is used. If it has no names, a `ValueError` is raised. ''' try: scope = active_scope.get() return min(k for k, v in scope.items() if v is type_) except ValueError: raise ValueError(f'{type_} is not in the current Artisan scope.') def log(artifact: Artifact, type: str, **kwargs: object) -> None: ''' Log a build event (*e.g.* "Start", "Success", or "Failure"). An entry in the form `{"type": type, "timestamp": timestamp, **kwargs}` will be added to the metadata file's event log. ''' timestamp = datetime.now().isoformat() meta = json.loads((artifact / '_meta_.json').read_text()) meta['events'].append(dict(type=type, timestamp=timestamp, **kwargs)) write_json_atomically(artifact / '_meta_.json', meta) def write_json_atomically(dst: Path, obj: dict) -> None: ''' Serialize and object to a JSON file, atomically. ''' with TemporaryDirectory() as temp_root: with open(f'{temp_root}/f', 'w') as f: json.dump(obj, f, indent=2) Path(f'{temp_root}/f').replace(dst) def encode_path(path: PathLike) -> str: ''' Convert a path-like object to a artifact-root-directory-relative path string (a string starting with "@/"). ''' root = active_root.get().resolve() dots: List[str] = [] path = Path(path).resolve() while root not in (*path.parents, path): root = root.parent dots.append('..') return '@/' + '/'.join(Path(*dots, path.relative_to(root)).parts) def decode_path(path_str: str) -> PathLike: ''' Convert a root-directory-relative path string (a string starting with "@/") to an artifact if it points to an existing directory, or a `Path` object, otherwise. ''' root = active_root.get().resolve() path = (root / path_str[2:]).resolve() return Artifact @ path if path.is_dir() else path # type: ignore