"""Top-level package for PyCAPIO.
PyCAPIO brings transparent data streaming to file-based Python workflows by
*monkey patching* Python's built-in I/O entry points (:func:`open`,
:func:`os.mkdir`, :func:`os.scandir`, ...) and routing any access that targets
the configured CAPIO directory through the native CAPIO server instead of the
operating system.
The public surface of this module is intentionally small:
* :func:`CapioContext` -- a decorator that initialises CAPIO and patches the
built-in I/O functions for the duration of the decorated call.
* the ``*_proxy`` callables (:func:`open_proxy`, :func:`mkdir_proxy`, ...) --
drop-in replacements for the corresponding built-ins that dispatch to CAPIO
when a path lives inside the CAPIO directory and fall back to the original
implementation otherwise.
Everything else re-exported here (``pycapio_open``, ``PyCapioPath``,
``FILE_MODES``, the IO wrappers, ...) comes from the compiled extension
:mod:`pycapio._pycapio` and is documented in the *Native API* section.
Example:
Intercept only the I/O performed inside a single function::
from pycapio import CapioContext
@CapioContext(capio_dir=".", app_name="reader",
workflow_name="example_workflow")
def read(path):
with open(path, "r") as f:
return f.read()
"""
import io
import os
from functools import wraps
from os import path
from typing import Any
import atexit
import builtins
from ._pycapio import *
__all__ = [name for name in globals() if not name.startswith("_")] + ["CapioContext"]
def _dump_context() -> dict[str, Any]:
"""Capture the currently installed built-in I/O callables.
Returns:
A mapping from dotted built-in name (for example ``"builtins.open"``)
to the callable currently bound to it. This snapshot is used to
restore the originals once a :func:`CapioContext` region exits.
"""
return {
"builtins.open": builtins.open,
"os.mkdir": os.mkdir,
"os.makedirs": os.makedirs,
"os.scandir": os.scandir,
"io.open": io.open,
"os.listdir": os.listdir,
"os.path": os.path,
}
def _restore_context(context: dict[str, Any]):
"""Reinstate the original built-in I/O callables.
Args:
context: A snapshot previously produced by :func:`_dump_context`.
"""
builtins.open = context["builtins.open"]
os.mkdir = context["os.mkdir"]
os.makedirs = context["os.makedirs"]
os.scandir = context["os.scandir"]
io.open = context["io.open"]
os.listdir = context["os.listdir"]
os.path = context["os.path"]
def _patch_context():
"""Replace the built-in I/O callables with their CAPIO-aware proxies.
After this call, :func:`open`, :func:`os.mkdir`, :func:`os.makedirs`,
:func:`os.scandir`, :func:`io.open` and :func:`os.listdir` route through the
matching ``*_proxy`` function, and :data:`os.path` is swapped for the native
:class:`PyCapioPath` implementation.
"""
builtins.open = open_proxy
os.mkdir = mkdir_proxy
os.makedirs = makedirs_proxy
os.scandir = scandir_proxy
io.open = open_proxy
os.listdir = listdir_proxy
os.path = PyCapioPath
py_capio_initialized = False
"""bool: ``True`` once :func:`pycapio_init` has run in this process.
CAPIO is initialised at most once per process; the first
:func:`CapioContext` invocation flips this flag so later ones reuse the running
server.
"""
_BUILTIN_STACK = _dump_context()
_CAPIO_DIR: str | None = None
def scandir_proxy(path: str):
"""CAPIO-aware replacement for :func:`os.scandir`.
Args:
path: Directory to scan.
Returns:
A :class:`PyCapioScandirWrapper` when ``path`` resolves inside the
active CAPIO directory, otherwise the result of the original
:func:`os.scandir`.
"""
global _CAPIO_DIR
target_path = _BUILTIN_STACK["os.path"].abspath(path)
if _CAPIO_DIR and target_path.startswith(_CAPIO_DIR):
return PyCapioScandirWrapper(path)
return _BUILTIN_STACK["os.scandir"](path)
def open_proxy(*args, **kwargs):
"""CAPIO-aware replacement for the built-in :func:`open`.
The first positional argument is treated as the file path. The Python
``mode`` string (``"r"``, ``"w+"``, ``"ab"``, ...) is translated into the
corresponding CAPIO open flags (see :data:`FILE_MODES`) before delegating to
:func:`pycapio_open`.
The original :func:`open` is used unchanged when any of the following hold:
* the first argument is not a path-like object;
* no CAPIO directory is active;
* the target path lies outside the CAPIO directory;
* the target path is the interactive ``.python_history`` file.
Args:
*args: Positional arguments accepted by :func:`open`; ``args[0]`` is the
path and the optional ``args[1]`` is the mode.
**kwargs: Keyword arguments accepted by :func:`open` (notably ``mode``).
Returns:
A :class:`PyCapioBinaryIOWrapper` for binary modes (``"b"``) or a
:class:`PyCapioTextIOWrapper` for text modes when the path is handled by
CAPIO; otherwise a standard file object from the built-in :func:`open`.
"""
global _CAPIO_DIR
arg0 = args[0] if args else None
if not args or not isinstance(arg0, (str, bytes, os.PathLike)):
return _BUILTIN_STACK["builtins.open"](*args, **kwargs)
target_path = _BUILTIN_STACK["os.path"].abspath(args[0])
if _CAPIO_DIR and (not target_path.startswith(_CAPIO_DIR) or ".python_history" in target_path):
return _BUILTIN_STACK["builtins.open"](*args, **kwargs)
flags_str = kwargs.get("mode", args[1] if len(args) > 1 else "r")
pycapio_flags = 0
if "+" in flags_str:
pycapio_flags |= FILE_MODES["O_RDWR"]
if "w" in flags_str:
pycapio_flags |= FILE_MODES["O_CREAT"]
if "a" in flags_str:
pycapio_flags |= FILE_MODES["O_APPEND"]
else:
if "w" in flags_str: pycapio_flags |= FILE_MODES["O_WRONLY"] | FILE_MODES["O_CREAT"]
if "a" in flags_str: pycapio_flags |= FILE_MODES["O_WRONLY"] | FILE_MODES["O_APPEND"]
if "r" in flags_str: pycapio_flags |= FILE_MODES["O_RDONLY"]
fd = pycapio_open(target_path, pycapio_flags, 0o777)
return PyCapioBinaryIOWrapper(fd) if "b" in flags_str else PyCapioTextIOWrapper(fd)
def mkdir_proxy(path_val, mode=0o777, *args, **kwargs):
"""CAPIO-aware replacement for :func:`os.mkdir`.
Args:
path_val: Directory to create.
mode: Permission bits applied to the new directory.
*args: Extra positional arguments forwarded to the original
:func:`os.mkdir` on the fallback path.
**kwargs: Extra keyword arguments forwarded to the original
:func:`os.mkdir` on the fallback path.
Returns:
The result of :func:`pycapio_mkdir` when ``path_val`` is inside the
active CAPIO directory, otherwise the result of the original
:func:`os.mkdir`.
"""
global _CAPIO_DIR
target_path = _BUILTIN_STACK["os.path"].abspath(path_val)
if _CAPIO_DIR and target_path.startswith(_CAPIO_DIR):
return pycapio_mkdir(target_path, mode)
else:
return _BUILTIN_STACK["os.mkdir"](path_val, mode, *args, **kwargs)
def makedirs_proxy(path_val, mode=0o777, *args, **kwargs):
"""CAPIO-aware replacement for :func:`os.makedirs`.
Note:
Inside the CAPIO directory the creation is delegated to
:func:`pycapio_mkdir`; intermediate directories are handled by the CAPIO
server rather than being created recursively in Python.
Args:
path_val: Directory path to create.
mode: Permission bits applied to the new directory.
*args: Extra positional arguments forwarded to the original
:func:`os.makedirs` on the fallback path.
**kwargs: Extra keyword arguments forwarded to the original
:func:`os.makedirs` on the fallback path.
Returns:
The result of :func:`pycapio_mkdir` when ``path_val`` is inside the
active CAPIO directory, otherwise the result of the original
:func:`os.makedirs`.
"""
global _CAPIO_DIR
target_path = _BUILTIN_STACK["os.path"].abspath(path_val)
if _CAPIO_DIR and target_path.startswith(_CAPIO_DIR):
return pycapio_mkdir(target_path, mode)
else:
return _BUILTIN_STACK["os.makedirs"](path_val, mode, *args, **kwargs)
def listdir_proxy(dirpath: str):
"""CAPIO-aware replacement for :func:`os.listdir`.
Args:
dirpath: Directory whose entries should be listed.
Returns:
A list of entry names. Inside the CAPIO directory the names are gathered
by iterating a :class:`PyCapioScandirWrapper`; outside it the original
:func:`os.listdir` is used.
"""
global _CAPIO_DIR
dirpath = _BUILTIN_STACK["os.path"].abspath(dirpath)
if _CAPIO_DIR and dirpath.startswith(_CAPIO_DIR):
directory_entries: list[str] = []
for f in PyCapioScandirWrapper(dirpath):
directory_entries.append(f.name)
return directory_entries
return _BUILTIN_STACK["os.listdir"](dirpath)
[docs]
def CapioContext(*,
capio_dir=".",
app_name=CAPIO_DEFAULT_APP_NAME,
workflow_name=CAPIO_DEFAULT_WORKFLOW_NAME,
silent=True,
server_exec_path="capio_server",
capio_cl_configuration_file="",
await_server_timeout_seconds=2,
teardown_server=True
):
"""Decorator factory that runs a function with CAPIO interception enabled.
On first use within a process this initialises CAPIO (starting/attaching to
the CAPIO server) and registers teardown via :mod:`atexit`. For every call
of the decorated function it swaps the built-in I/O callables for their
CAPIO-aware proxies, runs the function, and restores the originals
afterwards -- so interception is scoped to the decorated call only.
All arguments are keyword-only.
Args:
capio_dir: Root directory managed by CAPIO. I/O on paths *inside* this
directory is intercepted; everything else falls back to the standard
library. Defaults to ``"."``.
app_name: Logical application name reported to CAPIO. Defaults to
:data:`CAPIO_DEFAULT_APP_NAME`.
workflow_name: Logical workflow name reported to CAPIO. Defaults to
:data:`CAPIO_DEFAULT_WORKFLOW_NAME`.
silent: When ``True`` (default) sets the ``SILENT`` environment variable
to ``"ON"`` to suppress CAPIO server chatter.
server_exec_path: Path or name of the CAPIO server executable. Defaults
to ``"capio_server"``.
capio_cl_configuration_file: Optional path to a CAPIO-CL configuration
file. Empty string means no configuration file.
await_server_timeout_seconds: How long to wait for the CAPIO server to
become ready before giving up. Defaults to ``2``.
teardown_server: When ``True`` (default) the CAPIO server is torn down at
process exit.
Returns:
A decorator that wraps the target function with CAPIO setup,
I/O patching and cleanup.
Example:
::
@CapioContext(capio_dir="./data", app_name="writer")
def produce(path, payload):
with open(path, "w") as f:
f.write(payload)
"""
def _CapioContext(func):
@wraps(func)
def wrapper(*args, **kwargs):
os.environ["SILENT"] = "ON" if silent else "OFF"
global py_capio_initialized
global _CAPIO_DIR
if not py_capio_initialized:
_CAPIO_DIR = _BUILTIN_STACK["os.path"].abspath(capio_dir)
pycapio_init(CAPIO_DIR=_CAPIO_DIR,
CAPIO_WORKFLOW_NAME=workflow_name,
CAPIO_APP_NAME=app_name,
capio_server_exec_path=server_exec_path,
capio_cl_configuration_file=capio_cl_configuration_file,
await_server_timeout_seconds=await_server_timeout_seconds)
py_capio_initialized = True
atexit.register(pycapio_teardown, teardown_server)
context = _dump_context()
_patch_context()
try:
return func(*args, **kwargs)
finally:
_restore_context(context)
return wrapper
return _CapioContext