from __future__ import annotations
from typing import Any, SupportsIndex
from janus.logger import logger
from janus.registry import CONTAINER_REGISTRY, wrap_value
from janus.tachyon_rs import TachyonEngine, TrackedDictCore, TrackedListCore
[docs]
class TrackedList(list[Any]):
"""
A list subclass that automatically logs mutations to the Janus engine.
TrackedList intercepts methods like `append`, `extend`, `insert`, `pop`,
`clear`, and `__setitem__` to ensure the Janus engine can track and
revert changes to the collection.
"""
def __init__(
self, items: list[Any], engine: TachyonEngine, name: str, owner: Any = None
) -> None:
"""
Initialize a tracked list.
Args:
items: The initial items for the list.
engine: The TachyonEngine instance for logging.
name: The attribute name or path of this container.
owner: The JanusBase object that owns this container.
"""
super().__init__(items)
self._core = TrackedListCore(engine, name)
self._engine = engine
self._owner = owner if owner is not None else getattr(engine, "owner", None)
self._name = name
self._silent = False
@property
def _is_silent(self) -> bool:
return self._silent or getattr(self._owner, "_restoring", False)
[docs]
def append(self, value: Any) -> None:
wrapped = wrap_value(
value, self._engine, f"{self._name}[{len(self)}]", owner=self._owner
)
super().append(wrapped)
if not self._is_silent:
# Snapshot for DAG
dag_val = wrapped
if isinstance(wrapped, list):
dag_val = list(wrapped)
elif isinstance(wrapped, dict):
dag_val = dict(wrapped)
self._core.log_insert(len(self) - 1, dag_val)
logger.trace(f"TrackedList ({self._name}) appended item")
[docs]
def extend(self, values: Any) -> None:
start_idx = len(self)
wrapped_values = [
wrap_value(
v, self._engine, f"{self._name}[{start_idx + i}]", owner=self._owner
)
for i, v in enumerate(values)
]
super().extend(wrapped_values)
if not self._is_silent:
self._core.log_extend(wrapped_values)
logger.trace(
f"TrackedList ({self._name}) extended with {len(values)} items"
)
[docs]
def insert(self, index: SupportsIndex, value: Any) -> None:
wrapped = wrap_value(
value, self._engine, f"{self._name}[{int(index)}]", owner=self._owner
)
super().insert(index, wrapped)
if not self._is_silent:
# Snapshot for DAG
dag_val = wrapped
if isinstance(wrapped, list):
dag_val = list(wrapped)
elif isinstance(wrapped, dict):
dag_val = dict(wrapped)
self._core.log_insert(int(index), dag_val)
[docs]
def pop(self, index: SupportsIndex = -1) -> Any:
# Convert index to int to perform arithmetic
int_index = int(index)
if int_index < 0:
int_index = len(self) + int_index
value = super().pop(int_index)
if not self._is_silent:
self._core.log_pop(int_index, value)
logger.trace(f"TrackedList ({self._name}) popped item at {int_index}")
return value
[docs]
def clear(self) -> None:
old_values = list(self)
super().clear()
if not self._is_silent:
self._core.log_clear(old_values)
def __setitem__(self, index: Any, value: Any) -> None:
old_value = self[index]
wrapped = wrap_value(
value, self._engine, f"{self._name}[{index}]", owner=self._owner
)
super().__setitem__(index, wrapped)
if not self._is_silent:
self._core.log_replace(index, old_value, wrapped)
def __delitem__(self, index: Any) -> None:
value = self[index]
super().__delitem__(index)
if not self._is_silent:
self._core.log_pop(index, value)
[docs]
def remove(self, value: Any) -> None:
index = self.index(value)
super().remove(value)
if not self._is_silent:
self._core.log_pop(index, value)
[docs]
class TrackedDict(dict[Any, Any]):
"""
A dict subclass that automatically logs mutations to the Janus engine.
TrackedDict intercepts key assignments, deletions, and updates to ensure
the Janus engine can track and revert changes to the collection.
"""
def __init__(
self, items: dict[str, Any], engine: TachyonEngine, name: str, owner: Any = None
) -> None:
"""
Initialize a tracked dictionary.
Args:
items: The initial key-value pairs.
engine: The TachyonEngine instance for logging.
name: The attribute name or path of this container.
owner: The JanusBase object that owns this container.
"""
super().__init__(items)
self._core = TrackedDictCore(engine, name)
self._engine = engine
self._owner = owner if owner is not None else getattr(engine, "owner", None)
self._name = name
self._silent = False
@property
def _is_silent(self) -> bool:
return self._silent or getattr(self._owner, "_restoring", False)
def __setitem__(self, key: Any, value: Any) -> None:
try:
old_value = self[key]
except KeyError:
old_value = None
wrapped = wrap_value(
value, self._engine, f"{self._name}.{key}", owner=self._owner
)
super().__setitem__(key, wrapped)
if not self._is_silent:
# Snapshot for DAG
dag_val = wrapped
if isinstance(wrapped, list):
dag_val = list(wrapped)
elif isinstance(wrapped, dict):
dag_val = dict(wrapped)
dag_old = old_value
if isinstance(old_value, list):
dag_old = list(old_value)
elif isinstance(old_value, dict):
dag_old = dict(old_value)
self._core.log_update([str(key)], [dag_old], [dag_val])
def __delitem__(self, key: Any) -> None:
old_value = self[key]
super().__delitem__(key)
if not self._is_silent:
self._core.log_delete(str(key), old_value)
logger.trace(f"TrackedDict ({self._name}) deleted key: {key}")
[docs]
def update(self, other: Any = (), /, **kwargs: Any) -> None:
actual_other = other if hasattr(other, "keys") else dict(other)
actual_other.update(kwargs)
keys: list[str] = []
old_vals: list[Any] = []
new_vals: list[Any] = []
for k, v in actual_other.items():
keys.append(str(k))
old_vals.append(self.get(k))
wrapped = wrap_value(
v, self._engine, f"{self._name}.{k}", owner=self._owner
)
new_vals.append(wrapped)
super().__setitem__(k, wrapped)
if not self._is_silent:
self._core.log_update(keys, old_vals, new_vals)
[docs]
def pop(self, key: str, default: Any = None) -> Any:
if key in self:
val = self[key]
res = super().pop(key)
if not self._is_silent:
self._core.log_pop(str(key), val)
return res
return default
[docs]
def popitem(self) -> tuple[str, Any]:
key, value = super().popitem()
if not self._is_silent:
self._core.log_pop(str(key), value)
return key, value
[docs]
def setdefault(self, key: Any, default: Any = None) -> Any:
if key not in self:
wrapped = wrap_value(
default, self._engine, f"{self._name}.{key}", owner=self._owner
)
self[key] = wrapped
return self[key]
[docs]
def clear(self) -> None:
keys = [str(k) for k in self.keys()]
values = list(self.values())
super().clear()
if not self._is_silent:
self._core.log_clear(keys, values)
# Register containers to avoid circular imports in registry.py
CONTAINER_REGISTRY["list"] = TrackedList
CONTAINER_REGISTRY["dict"] = TrackedDict