Source code for artisan._http

'''
HTTP interfaces to Artisan contexts.

Exported definitions:
    API (class): A WSGI server that provides access to an Artisan context.
    WebUI (class): An HTTP responder for HTML UI requests.
    asset_builder (function decorator): <Not yet implemented>
    default_asset_builders (list): <Not yet implemented>
'''

from __future__ import annotations

import json, mimetypes, traceback
from base64 import b64decode
from datetime import datetime
from io import BufferedReader
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import (
    Callable, Collection, Dict, Iterator, List,
    Mapping, Optional, Tuple, TypeVar, Union, cast)
from wsgiref.simple_server import make_server

try:
    from fcntl import LOCK_SH, LOCK_UN, lockf
    locking_is_supported = True
except ImportError:
    locking_is_supported = False

import cbor2

from ._artifacts import Artifact, DynamicArtifact, build
from ._context import Context, get_context, using_context
from ._fs_index import DirIndex
from ._schemas import (
    get_spec_schema, get_spec_dict_schema,
    get_spec_list_schema)

T = TypeVar('T')

__all__ = ['API', 'WebUI', 'asset_builder', 'default_asset_builders']



#-- `API` ----------------------------------------------------------------------

class API:
    '''
    A WSGI server that provides access to an Artisan context.

    The following request types are supported:

    - `GET /artifacts{/path*}`
    - `POST /artifacts`
    - `DELETE /artifacts{/path*}`
    - `GET /schemas/spec`
    - `GET /schemas/spec-list`
    - `GET /schemas/spec-dict`
    - `GET /ui{/path*}`

    Analogous `HEAD` and `OPTIONS` requests are also supported.

    Arguments:
        permissions: A mapping from passwords to permission sets. Permissions
            sets can contain "read", "write", and/or "delete". The default
            permission policy is `{'': ('read', 'write', 'delete')}`, meaning
            even users who have not provided a password have "read", "write",
            and "delete" permissions.
        ui: The WSGI server that should handle requests in the form
            `/ui{/path*}`. The default UI is a `WebUI`.
        root: A path to override the active context's `root`.
        scope: A mapping to override the active context's `scope`.
        builder: A callable to override the active context's `builder`.
    '''
    def __init__(self, *,
                 permissions: Optional[Mapping[str, Collection[str]]] = None,
                 ui: Optional[Callable[[dict, Callable], None]] = None,
                 root: Union[str, Path, None] = None,
                 scope: Optional[Mapping[str, type]] = None,
                 builder: Optional[Callable[[Artifact, object], None]] = None
                 ) -> None:
        permissions = (
            permissions if permissions is not None
            else {'': ('read', 'write', 'delete')})
        self._allowed_methods = {
            password: (
                ('read' in user_perm) * ('OPTIONS', 'HEAD', 'GET')
                + ('write' in user_perm) * ('POST',)
                + ('delete' in user_perm) * ('DELETE',))
            for password, user_perm in permissions.items()}

        active_context = get_context()
        self._context = Context(
            root = active_context.root if root is None else root,
            scope = active_context.scope if scope is None else scope,
            builder = active_context.builder if builder is None else builder)

        with using_context(self._context):
            self._context.root.mkdir(parents=True, exist_ok=True)
            self._request_handlers: Dict[str, Callable] = {
                '/artifacts': ArtifactAPI(),
                '/schemas': SchemaAPI(),
                '/ui': ui or WebUI()}

    def __call__(self, env: dict, responder: Callable) -> Iterator[bytes]:
        '''
        Respond to a WSGI server request.
        '''
        method = env['REQUEST_METHOD']
        auth = env.get('HTTP_AUTHORIZATION', '')
        password = b64decode(auth.split(' ')[-1]).decode('utf8')[1:]
        if method not in self._allowed_methods.get(password, ()):
            responder('401 Unauthorized', [])
            return iter(())

        try:
            with using_context(self._context):
                prefix = '/'.join(env['PATH_INFO'].split('/')[:2])
                return self._request_handlers[prefix](env, responder)
        except KeyError:
            responder('404 Not Found', [])
            return iter(())

[docs] def serve(self, port: int = 8000) -> None: ''' Start a server on the specified port. This method uses the reference WSGI server defined in the standard library. Other servers, which can be installed via `pip`, may be more robust and performant. ''' with make_server('', port, self) as server: # type: ignore server.serve_forever()
#-- Artifact-request-handling -------------------------------------------------- class ArtifactAPI: ''' An HTTP responder for artifact creation, access, and modification requests. Supported routes: `GET /artifacts{/path*}`: Respond with the corresponding file or a shallow representation of the corresponding directory. `POST /artifacts{/path*}`: Create a new artifact from a specification. A password with "write" permission is required. `DELETE /artifacts{/path*}`: Delete the artifact at the given path. A password with "delete" permission is required. Analogous `HEAD` and `OPTIONS` requests are also supported. ''' def __init__(self, prefix: str = '/artifacts') -> None: self._prefix = prefix self._root = get_context().root.resolve() def __call__(self, env: dict, responder: Callable) -> Iterator[bytes]: method = env['REQUEST_METHOD'] handler = ( self._handle_options_request if method == 'OPTIONS' else self._handle_head_request if method == 'HEAD' else self._handle_get_request if method == 'GET' else self._handle_post_request if method == 'POST' else self._handle_delete_request if method == 'DELETE' else self._handle_405_error) static_headers = [ ('Access-Control-Allow-Origin', '*'), ('Cache-Control', 'no-cache')] try: status, dynamic_headers, body, *work = handler(env) responder(status, static_headers + dynamic_headers) yield body for work_item in work: work_item() except ValueError: responder('400 Bad Request', static_headers) return iter(()) except PermissionError: responder('403 Forbidden', static_headers) return iter(()) except OSError: responder('404 Not Found', static_headers) return iter(()) except Exception: responder('500 Internal Server Error', static_headers) yield traceback.format_exc().encode('utf8') def _handle_options_request(self, env: dict) -> tuple: return ('204 No Content', [('Access-Control-Allow-Methods', '*'), ('Access-Control-Allow-Headers', '*'), ('Allow', 'OPTIONS, HEAD, GET, POST, DELETE')], b'') def _handle_head_request(self, env: dict) -> tuple: env = {**env, 'HTTP_RANGE': 'bytes=0-0'} return (*self._handle_get_request(env)[:2], b'') def _handle_get_request(self, env: dict) -> tuple: path = self._get_path(env) if path.is_file(): return self._handle_file_get_request(env, path) else: return self._handle_dir_get_request(env, path) def _handle_file_get_request(self, env: dict, path: Path) -> tuple: timestamp = get_file_timestamp(path) if get_last_request_time(env) >= timestamp: return ('304 Not Modified', [], b'') else: start, end = env.get('HTTP_RANGE', '0-').strip('bytes=').split('-') start, end = int(start), (int(end) if end != '' else 2**48) timestamp_str = ( datetime.fromtimestamp(timestamp) .strftime('%a, %d %b %Y %H:%M:%S GMT')) body = read_file(path, start, end) return ('200 OK', [('Content-Type', get_content_type(path)), ('Content-Length', str(len(body))), ('Last-Modified', timestamp_str)], body) def _handle_dir_get_request(self, env: dict, path: Path) -> tuple: timestamp = get_dir_timestamp(path) if get_last_request_time(env) >= timestamp: return ('304 Not Modified', [], b'') else: timestamp_str = ( datetime.fromtimestamp(timestamp) .strftime('%a, %d %b %Y %H:%M:%S GMT')) entries = {name: None for name in (DynamicArtifact @ path)} body = cbor2.dumps({'_meta_': DirIndex(path).get_meta(), **entries}) return ('200 OK', [('Content-Type', 'application/cbor'), ('Content-Length', str(len(body))), ('Last-Modified', timestamp_str)], body) def _handle_post_request(self, env: dict) -> tuple: work: List[Callable] = [] original_builder = get_context().builder def delayed_builder(artifact: Artifact, spec: object) -> None: work.append(lambda: original_builder(artifact, spec)) req_body_size = int(env.get('CONTENT_LENGTH', '0')) req_body = cbor2.loads(env['wsgi.input'].read(req_body_size)) with using_context(get_context(), builder=delayed_builder): validate_path_strings(req_body) artifact = build(Artifact, req_body) uri = f'{self._prefix}/{artifact._path_.relative_to(self._root)}' headers = [('Content-Length', '0'), ('Location', uri)] return ('201 Created', headers, b'', *work) def _handle_delete_request(self, env: dict) -> tuple: with TemporaryDirectory() as dst: self._get_path(env).rename(Path(dst, 'tree-to-delete')) return ('204 No Content', [], b'') def _handle_405_error(self, env: dict) -> tuple: return ('405 Method Not Allowed', [('Allow', 'OPTIONS, HEAD, GET, POST, DELETE')], b'') def _get_path(self, env: dict) -> Path: path = env['PATH_INFO'][len(self._prefix)+1:] path = (self._root / path).resolve() path = DirIndex(path.parent).get_entry_path(path.stem) if path is None: raise FileNotFoundError() elif self._root not in (*path.parents, path): raise PermissionError() else: return path def get_file_timestamp(path: Path) -> float: mtime = path.stat().st_mtime return min(mtime, datetime.now().timestamp() - 2) def get_dir_timestamp(path: Path) -> float: try: meta_mtime = (path / '_meta_.json').stat().st_mtime except FileNotFoundError: meta_mtime = 0.0 mtime = max(path.stat().st_mtime, meta_mtime) return min(mtime, datetime.now().timestamp() - 2) def get_last_request_time(env: dict) -> float: try: time_format = '%a, %d %b %Y %H:%M:%S GMT' if_mod_header = env['HTTP_IF_MODIFIED_SINCE'] return datetime.strptime(if_mod_header, time_format).timestamp() except (KeyError, ValueError): return 0.0 def get_content_type(path: Path) -> str: return ('application/cbor' if path.suffix == '.cbor' else mimetypes.types_map.get(path.suffix, 'application/octet-stream')) def read_file(path: Path, start: int, end: int) -> bytes: ''' Read a file, locking the first 128 bytes while reading them. ''' buf = bytearray(min(end, path.stat().st_size) - start) with cast(BufferedReader, open(path, 'rb')) as f: if locking_is_supported: lockf(f, LOCK_SH, 128) header = cast(bytes, f.read(128)) if locking_is_supported: lockf(f, LOCK_UN) f.seek(start) f.readinto(buf) buf[:len(header[start:])] = header[start:] return bytes(buf) def validate_path_strings(obj: object) -> None: ''' Raise a `PermissionError` if `obj` contains any path strings outside the root artifact directory. ''' root = get_context().root.resolve() if isinstance(obj, str) and obj.startswith('@/'): path = (root / obj[2:]).resolve() if root not in (*path.parents, path): raise PermissionError(f'`{path}` is not in `{root}`') elif isinstance(obj, list): for v in obj: validate_path_strings(v) elif isinstance(obj, dict): if '_path_' in obj: path_str = obj['_path_'] if not path_str.startswith('@/'): raise PermissionError(f'`_path_` must start with "@/"') path = (root / path_str[2:]).resolve() if root not in (*path.parents, path): raise PermissionError(f'`{path}` is not in `{root}`') for v in obj.values(): validate_path_strings(v) #-- Schema-request-handling ---------------------------------------------------- class SchemaAPI: ''' An HTTP responder for JSON schema requests. Supported routes: `GET /schemas/spec`: Returns `get_spec_schema()`. `GET /schemas/spec-list`: Returns `get_spec_list_schema()`. `GET /schemas/spec-dict`: Returns `get_spec_dict_schema()`. Analogous `HEAD` and `OPTIONS` requests are also supported. ''' def __init__(self) -> None: encode = lambda x: json.dumps(x, indent=2).encode('utf8') self._schemas = { '/schemas/spec': encode(get_spec_schema()), '/schemas/spec-list': encode(get_spec_list_schema()), '/schemas/spec-dict': encode(get_spec_dict_schema())} def __call__(self, env: dict, responder: Callable) -> Iterator[bytes]: method = env['REQUEST_METHOD'] handler = ( self._handle_options_request if method == 'OPTIONS' else self._handle_head_request if method == 'HEAD' else self._handle_get_request if method == 'GET' else self._handle_405_error) static_headers = [ ('Access-Control-Allow-Origin', '*'), ('Cache-Control', 'no-cache')] status, dynamic_headers, body = handler(env) responder(status, static_headers + dynamic_headers) yield body def _handle_options_request(self, env: dict) -> tuple: return ('204 No Content', [('Access-Control-Allow-Methods', '*'), ('Access-Control-Allow-Headers', '*'), ('Allow', 'OPTIONS, HEAD, GET')], b'') def _handle_head_request(self, env: dict) -> tuple: return (*self._handle_get_request(env)[:2], b'') def _handle_get_request(self, env: dict) -> tuple: try: schema = self._schemas[env['PATH_INFO']] return ('200 OK', [('Content-Type', 'application/schema+json'), ('Content-Length', str(len(schema)))], schema) except KeyError: return ('404 Not Found', [], b'') def _handle_405_error(self, env: dict) -> tuple: return ('405 Method Not Allowed', [('Allow', 'OPTIONS, HEAD, GET')], b'') #-- UI-request-handling -------------------------------------------------------- class WebUI: ''' An HTTP responder for HTML UI requests. ''' # TODO: Make this more than a placeholder. def __init__(self, *, prefix: str = '/ui', styles: List[Union[Path, str]] = [], scripts: List[Union[Path, str]] = [], asset_builders: Optional[List[Tuple[str, Callable]]] = None ) -> None: self._prefix = prefix def __call__(self, env: dict, responder: Callable) -> Iterator[bytes]: method = env['REQUEST_METHOD'] handler = ( self._handle_options_request if method == 'OPTIONS' else self._handle_head_request if method == 'HEAD' else self._handle_get_request if method == 'GET' else self._handle_405_error) static_headers = [ ('Access-Control-Allow-Origin', '*'), ('Cache-Control', 'no-cache')] status, dynamic_headers, body = handler(env) responder(status, static_headers + dynamic_headers) yield body def _handle_options_request(self, env: dict) -> tuple: return ('204 No Content', [('Access-Control-Allow-Methods', '*'), ('Access-Control-Allow-Headers', '*'), ('Allow', 'OPTIONS, HEAD, GET')], b'') def _handle_head_request(self, env: dict) -> tuple: return (*self._handle_get_request(env)[:2], b'') def _handle_get_request(self, env: dict) -> tuple: path = env['PATH_INFO'][len(self._prefix)+1:] core_asset_path = Path(__file__).parent / '_ui' if path in ('', '/'): content_type = 'text/html' content = ( (core_asset_path / 'index.html').read_text() .replace('{{extensionStyles}}', '') .replace('{{extensionScripts}}', '') .replace('{{refreshInterval}}', '1') .encode()) elif path.startswith('_core-assets/'): resource_name = path.replace('_core-assets/', '', 1) content_type = get_content_type(Path(resource_name)) content = (core_asset_path / resource_name).read_bytes() else: return ('404 Not Found', [], b'') return ('200 OK', [('Content-Type', content_type), ('Content-Length', str(len(content)))], content) def _handle_405_error(self, env: dict) -> tuple: return ('405 Method Not Allowed', [('Allow', 'OPTIONS, HEAD, GET')], b'') def asset_builder(self, uri_template: str) -> Callable[[T], T]: raise NotImplementedError() default_asset_builders: List[Tuple[str, Callable]] = [] def asset_builder(uri_template: str) -> Callable[[T], T]: def add_builder(builder: T) -> T: if callable(builder): default_asset_builders.append( (uri_template, builder)) return builder return add_builder