Source code for janus.persistence

from __future__ import annotations

import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

import msgpack

from janus.logger import logger

try:
    import pandas as pd

    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False

from janus.registry import CONTAINER_REGISTRY

if TYPE_CHECKING:
    from janus.base import JanusBase


[docs] def janus_encoder(obj: Any) -> Any: """Custom encoder for Janus-tracked objects to ensure msgpack compatibility.""" # Check for pandas/numpy without causing circular imports or requiring them # We look at the class name to avoid direct isinstance checks if possible, # or import locally. cls_name = type(obj).__name__ if cls_name in ("TrackedDataFrame", "DataFrame"): return { "__janus_type__": "pd.DataFrame", "data": obj.to_dict(orient="split"), } if cls_name in ("TrackedSeries", "Series"): return { "__janus_type__": "pd.Series", "data": obj.to_dict(), "name": getattr(obj, "name", None), } dict_cls = CONTAINER_REGISTRY.get("dict") if dict_cls and isinstance(obj, dict_cls): return dict(cast("dict[Any, Any]", obj)) list_cls = CONTAINER_REGISTRY.get("list") if list_cls and isinstance(obj, list_cls): return list(cast("list[Any]", obj)) return obj
[docs] def janus_decoder(obj: Any) -> Any: """Custom decoder for Janus-tracked objects to re-hydrate during load.""" if isinstance(obj, dict) and "__janus_type__" in obj and PANDAS_AVAILABLE: if obj["__janus_type__"] == "pd.DataFrame": return pd.DataFrame(**obj["data"]) if obj["__janus_type__"] == "pd.Series": return pd.Series(obj["data"], name=obj.get("name")) return obj
[docs] class JanusPersistence: """Handles serialization and persistence of Janus state histories."""
[docs] @staticmethod def save(obj: JanusBase, path: str | Path) -> None: """Persist the entire multiverse/timeline history to a .jns file.""" path = Path(path) if path.suffix != ".jns": path = path.with_suffix(".jns") logger.info(f"Saving multiversal history to {path}") # 1. Get DAG from Rust dag_state = obj._engine.get_graph_state() # 2. Extract Python context (shadow snapshots for plugins) context = {k: v for k, v in obj.__dict__.items() if k.startswith("_shadow_")} # 3. Serialize with zipfile.ZipFile(path, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr( "dag.msgpack", msgpack.packb(dag_state, default=janus_encoder, use_bin_type=True), ) zf.writestr( "context.msgpack", msgpack.packb(context, default=janus_encoder, use_bin_type=True), )
[docs] @staticmethod def load(obj: JanusBase, path: str | Path) -> None: """Restore history and state from a .jns file.""" path = Path(path) if not path.exists(): logger.error(f"Failed to load: Persistence file not found -> {path}") raise FileNotFoundError(f"Persistence file not found: {path}") logger.info(f"Loading multiversal history from {path}") with zipfile.ZipFile(path, "r") as zf: dag_data = msgpack.unpackb( zf.read("dag.msgpack"), object_hook=janus_decoder, strict_map_key=False, raw=False, ) ctx_data = msgpack.unpackb( zf.read("context.msgpack"), object_hook=janus_decoder, strict_map_key=False, raw=False, ) # 1. Restore Rust engine state obj._engine.set_graph_state(dag_data) obj._restoring = True try: # 2. Re-hydrate Python context for k, v in ctx_data.items(): setattr(obj, k, v) # 3. Synchronize live objects with the loaded head node obj._engine.sync_from_root() # 4. Re-link top-level attributes to ensure they are tracked for name, value in obj.__dict__.items(): if not name.startswith("_"): setattr(obj, name, value) finally: obj._restoring = False