"""JSON persistence helpers for small agent state files (no pickle at runtime)."""
import collections
import functools
import json
import logging
import os
from asyncio import iscoroutinefunction
from typing import Any, Callable, Union
logger = logging.getLogger(__name__)
def _to_jsonable(obj: Any) -> Any:
if isinstance(obj, collections.deque):
return [_to_jsonable(item) for item in obj]
if isinstance(obj, dict):
return {k: _to_jsonable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_to_jsonable(item) for item in obj]
return obj
def _dump(path, obj):
"""Atomically write ``obj`` to ``path`` as JSON."""
payload = json.dumps(_to_jsonable(obj))
tmp = "{}.tmp".format(path)
with open(tmp, "w", encoding="utf-8") as w:
w.write(payload)
os.replace(tmp, path)
def serialize_attr(*, path: str, attr: str):
"""Decorator: after the wrapped method runs, persist ``self.``
to ``path`` as JSON."""
def decorator(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
result = f(self, *args, **kwargs)
obj = getattr(self, attr)
logger.debug("Write %r to %r", obj, path)
_dump(path, obj)
return result
@functools.wraps(f)
async def async_wrapper(self, *args, **kwargs):
result = await f(self, *args, **kwargs)
obj = getattr(self, attr)
logger.debug("Write %r to %r", obj, path)
_dump(path, obj)
return result
if iscoroutinefunction(f):
return async_wrapper
return wrapper
return decorator
def unserialize(*, path: str, fallback: Union[Callable, object] = None):
"""Restore an object from ``path`` (JSON); a top-level list becomes a
deque to match the legacy queue API, and missing/unparseable input
returns ``fallback`` (called if callable)."""
try:
with open(path, "r", encoding="utf-8") as r:
obj = json.load(r)
except FileNotFoundError:
logger.warning("Can't find %s to unserialize", path)
except Exception as e:
logger.error("Unserialize failed with %r. Returning fallback", e)
else:
if isinstance(obj, list):
return collections.deque(obj)
return obj
return fallback() if callable(fallback) else fallback