Snapshots & State Restoration
Save and restore machine state — persistence, crash recovery, and testing.
Snapshots & State Restoration
Snapshots let you capture and restore a machine’s state at any point in time. This implements the Memento pattern, enabling persistence, crash recovery, workflow checkpointing, and targeted testing.
What are Snapshots?
A snapshot is a JSON string that captures the essential runtime state of an interpreter:
status— the interpreter’s lifecycle status ("running","stopped", etc.)context— the full context dictionary (all mutable data)state_ids— the list of all currently active state IDs
You create a snapshot with get_snapshot() and restore from one with from_snapshot().
get_snapshot() — Capturing State
The get_snapshot() method returns a JSON string representing the interpreter’s current state. Call it on any running interpreter at any time.
from xstate_statemachine import create_machine, SyncInterpreter
config = {
"id": "workflow",
"initial": "step1",
"context": {"progress": 0, "data": {}},
"states": {
"step1": {"on": {"NEXT": {"target": "step2", "actions": "updateProgress"}}},
"step2": {"on": {"NEXT": {"target": "step3", "actions": "updateProgress"}}},
"step3": {"on": {"NEXT": {"target": "step4", "actions": "updateProgress"}}},
"step4": {"type": "final"}
}
}
from xstate_statemachine import MachineLogic
logic = MachineLogic(
actions={
"updateProgress": lambda i, ctx, e, a: ctx.update(
{"progress": ctx["progress"] + 25}
)
}
)
machine = create_machine(config, logic=logic)
interp = SyncInterpreter(machine).start()
# Advance to step2
interp.send("NEXT")
interp.context["data"]["step1_result"] = "validated"
# Capture a snapshot
snapshot_json = interp.get_snapshot()
print(snapshot_json)
Output:
{
"status": "running",
"context": {
"progress": 25,
"data": {
"step1_result": "validated"
}
},
"state_ids": [
"workflow.step2"
]
}
The snapshot is a standard JSON string — you can store it anywhere: files, databases, message queues, or environment variables.
interp.stop()
from_snapshot() — Restoring State
The from_snapshot() class method creates a new interpreter instance pre-configured with the saved state. You provide the snapshot JSON string and the same machine definition that was used to create the original interpreter.
import json
# Re-create the same machine definition
machine = create_machine(config, logic=logic)
# Restore from the snapshot
restored = SyncInterpreter.from_snapshot(snapshot_json, machine)
print(restored.current_state_ids) # {'workflow.step2'}
print(restored.context["progress"]) # 25
print(restored.context["data"]) # {'step1_result': 'validated'}
print(restored.status) # 'running'
# Continue from where we left off
restored.send("NEXT") # step2 -> step3
print(restored.current_state_ids) # {'workflow.step3'}
print(restored.context["progress"]) # 50
restored.send("NEXT") # step3 -> step4 (final)
restored.stop()
Note:
from_snapshot()is a class method on bothSyncInterpreterandInterpreter. Call it on the class you want to create:restored_sync = SyncInterpreter.from_snapshot(snapshot_json, machine) restored_async = Interpreter.from_snapshot(snapshot_json, machine)
Important Limitations
Warning:
from_snapshot()performs a static restoration. Be aware of these constraints:
- Entry actions are NOT re-run — The restored interpreter is placed directly into the saved states without executing their
entryactions.- Services are NOT restarted — Any
invokeservices that were running when the snapshot was taken are not re-invoked.- Timers are NOT resumed — Any
afterdelayed transitions are not rescheduled. The countdown does not persist across snapshots.- Context is restored by value — The context dictionary is deserialized from JSON. Non-serializable values (functions, class instances, file handles) will be lost or converted to strings.
- Machine definition must match — The
machineargument tofrom_snapshot()must have the same structure as the original. If state IDs have changed, restoration will fail withStateNotFoundError.
Persistence: Save to File
The simplest persistence pattern writes the snapshot to a JSON file:
from pathlib import Path
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "onboarding",
"initial": "welcome",
"context": {"user": "alice", "steps_completed": []},
"states": {
"welcome": {"on": {"CONTINUE": "profile"}},
"profile": {"on": {"CONTINUE": "preferences"}},
"preferences": {"on": {"CONTINUE": "complete"}},
"complete": {"type": "final"}
}
}
machine = create_machine(config)
interp = SyncInterpreter(machine).start()
interp.send("CONTINUE") # welcome -> profile
interp.context["steps_completed"].append("welcome")
# Save to file
snapshot = interp.get_snapshot()
Path("onboarding_state.json").write_text(snapshot)
interp.stop()
# --- Later, in a different process or after a restart ---
# Load from file
saved = Path("onboarding_state.json").read_text()
machine = create_machine(config) # Same definition
restored = SyncInterpreter.from_snapshot(saved, machine)
print(restored.current_state_ids) # {'onboarding.profile'}
print(restored.context["steps_completed"]) # ['welcome']
# Continue the workflow
restored.send("CONTINUE") # profile -> preferences
restored.context["steps_completed"].append("profile")
restored.stop()
Database Persistence: SQLite Example
For production systems, store snapshots in a database:
import sqlite3
import json
from xstate_statemachine import create_machine, SyncInterpreter
# --- Database Setup ---
def init_db(db_path="machines.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS machine_snapshots (
machine_id TEXT PRIMARY KEY,
snapshot TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
return conn
def save_snapshot(conn, machine_id, interpreter):
"""Save the interpreter's current state to the database."""
snapshot = interpreter.get_snapshot()
conn.execute(
"""INSERT OR REPLACE INTO machine_snapshots
(machine_id, snapshot, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)""",
(machine_id, snapshot)
)
conn.commit()
print(f"Saved snapshot for '{machine_id}'")
def load_snapshot(conn, machine_id):
"""Load a saved snapshot from the database."""
cursor = conn.execute(
"SELECT snapshot FROM machine_snapshots WHERE machine_id = ?",
(machine_id,)
)
row = cursor.fetchone()
return row[0] if row else None
# --- Usage ---
config = {
"id": "order-42",
"initial": "placed",
"context": {"items": ["widget"], "total": 29.99},
"states": {
"placed": {"on": {"CONFIRM": "confirmed"}},
"confirmed": {"on": {"SHIP": "shipped"}},
"shipped": {"on": {"DELIVER": "delivered"}},
"delivered": {"type": "final"}
}
}
conn = init_db()
machine = create_machine(config)
interp = SyncInterpreter(machine).start()
interp.send("CONFIRM")
save_snapshot(conn, "order-42", interp)
interp.stop()
# --- Later ---
saved_json = load_snapshot(conn, "order-42")
if saved_json:
machine = create_machine(config)
restored = SyncInterpreter.from_snapshot(saved_json, machine)
print(f"Restored state: {restored.current_state_ids}")
# {'order-42.confirmed'}
restored.send("SHIP")
save_snapshot(conn, "order-42", restored)
restored.stop()
conn.close()
Async Snapshots
Snapshots work identically with the async Interpreter:
import asyncio
from xstate_statemachine import create_machine, Interpreter
config = {
"id": "asyncWorkflow",
"initial": "phase1",
"context": {"results": []},
"states": {
"phase1": {"on": {"ADVANCE": "phase2"}},
"phase2": {"on": {"ADVANCE": "phase3"}},
"phase3": {"type": "final"}
}
}
async def main():
machine = create_machine(config)
interp = await Interpreter(machine).start()
await interp.send("ADVANCE")
await asyncio.sleep(0.1) # Let the event process
# Capture snapshot (synchronous method — no await needed)
snapshot = interp.get_snapshot()
await interp.stop()
# Restore into an async interpreter
machine = create_machine(config)
restored = Interpreter.from_snapshot(snapshot, machine)
print(restored.current_state_ids) # {'asyncWorkflow.phase2'}
print(restored.status) # 'running'
asyncio.run(main())
Testing with Snapshots
Snapshots are powerful for testing because they let you jump directly to a specific state without replaying the full event sequence:
import json
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "checkout",
"initial": "cart",
"context": {"items": [], "total": 0, "payment_status": None},
"states": {
"cart": {"on": {"PROCEED": "shipping"}},
"shipping": {"on": {"CONFIRM_ADDRESS": "payment"}},
"payment": {
"on": {
"PAY": {"target": "processing", "actions": "startPayment"}
}
},
"processing": {
"on": {
"SUCCESS": "confirmed",
"FAILURE": "payment"
}
},
"confirmed": {"type": "final"}
}
}
logic = MachineLogic(
actions={
"startPayment": lambda i, ctx, e, a: ctx.update(
{"payment_status": "processing"}
)
}
)
# --- Test: Jump directly to the payment state ---
def test_payment_flow():
"""Test the payment flow without navigating through cart and shipping."""
# Create a snapshot that puts us directly in the payment state
snapshot = json.dumps({
"status": "running",
"context": {
"items": [{"name": "Widget", "price": 29.99}],
"total": 29.99,
"payment_status": None
},
"state_ids": ["checkout.payment"]
})
machine = create_machine(config, logic=logic)
interp = SyncInterpreter.from_snapshot(snapshot, machine)
# Test: send PAY event
interp.send("PAY")
assert interp.context["payment_status"] == "processing"
assert interp.current_state_ids == {"checkout.processing"}
# Test: payment succeeds
interp.send("SUCCESS")
assert interp.current_state_ids == {"checkout.confirmed"}
interp.stop()
print("test_payment_flow PASSED")
def test_payment_failure_retry():
"""Test that payment failure returns to the payment state."""
snapshot = json.dumps({
"status": "running",
"context": {"items": [], "total": 0, "payment_status": None},
"state_ids": ["checkout.processing"]
})
machine = create_machine(config, logic=logic)
interp = SyncInterpreter.from_snapshot(snapshot, machine)
interp.send("FAILURE")
assert interp.current_state_ids == {"checkout.payment"}
interp.stop()
print("test_payment_failure_retry PASSED")
test_payment_flow()
test_payment_failure_retry()
Complete Example: Long-Running Workflow with Checkpointing
This pattern saves a snapshot after each step, enabling crash recovery:
from pathlib import Path
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
CHECKPOINT_FILE = Path("workflow_checkpoint.json")
config = {
"id": "dataProcessing",
"initial": "ingesting",
"context": {
"records_ingested": 0,
"records_transformed": 0,
"records_loaded": 0,
"errors": []
},
"states": {
"ingesting": {
"on": {
"INGEST_DONE": {"target": "transforming", "actions": "markIngested"}
}
},
"transforming": {
"on": {
"TRANSFORM_DONE": {"target": "loading", "actions": "markTransformed"}
}
},
"loading": {
"on": {
"LOAD_DONE": {"target": "complete", "actions": "markLoaded"}
}
},
"complete": {"type": "final"}
}
}
logic = MachineLogic(
actions={
"markIngested": lambda i, ctx, e, a: ctx.update(
{"records_ingested": e.payload.get("count", 0)}
),
"markTransformed": lambda i, ctx, e, a: ctx.update(
{"records_transformed": e.payload.get("count", 0)}
),
"markLoaded": lambda i, ctx, e, a: ctx.update(
{"records_loaded": e.payload.get("count", 0)}
),
}
)
def checkpoint(interp):
"""Save current state to disk after each step."""
CHECKPOINT_FILE.write_text(interp.get_snapshot())
print(f"Checkpoint saved: {interp.current_state_ids}")
def resume_or_start():
"""Resume from checkpoint if available, otherwise start fresh."""
machine = create_machine(config, logic=logic)
if CHECKPOINT_FILE.exists():
saved = CHECKPOINT_FILE.read_text()
print("Resuming from checkpoint...")
return SyncInterpreter.from_snapshot(saved, machine)
else:
print("Starting fresh...")
return SyncInterpreter(machine).start()
# --- Main workflow ---
interp = resume_or_start()
current = interp.current_state_ids
# Only process steps that haven't been completed
if "dataProcessing.ingesting" in current:
print("Running ingest phase...")
interp.send("INGEST_DONE", count=1500)
checkpoint(interp)
if "dataProcessing.transforming" in interp.current_state_ids:
print("Running transform phase...")
interp.send("TRANSFORM_DONE", count=1450)
checkpoint(interp)
if "dataProcessing.loading" in interp.current_state_ids:
print("Running load phase...")
interp.send("LOAD_DONE", count=1450)
checkpoint(interp)
print(f"Final context: {interp.context}")
interp.stop()
# Clean up checkpoint on success
if CHECKPOINT_FILE.exists():
CHECKPOINT_FILE.unlink()
print("Checkpoint cleaned up.")
Complete Example: Crash Recovery Pattern
import json
import sys
from pathlib import Path
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
class RecoverableWorkflow:
"""A workflow wrapper that automatically persists and recovers state."""
def __init__(self, config, logic=None, storage_path="workflow_state.json"):
self.config = config
self.logic = logic
self.storage_path = Path(storage_path)
self.interp = None
def start(self):
"""Start or resume the workflow."""
machine = create_machine(self.config, logic=self.logic)
if self.storage_path.exists():
snapshot = self.storage_path.read_text()
self.interp = SyncInterpreter.from_snapshot(snapshot, machine)
print(f"Recovered workflow at state: {self.interp.current_state_ids}")
else:
self.interp = SyncInterpreter(machine).start()
print(f"Started new workflow at state: {self.interp.current_state_ids}")
return self
def send(self, event_type, **payload):
"""Send an event and auto-save the state."""
self.interp.send(event_type, **payload)
self._save()
return self
def _save(self):
"""Persist current state to disk."""
self.storage_path.write_text(self.interp.get_snapshot())
def finish(self):
"""Stop the interpreter and clean up persistence."""
self.interp.stop()
if self.storage_path.exists():
self.storage_path.unlink()
print("Workflow completed and cleaned up.")
@property
def state(self):
return self.interp.current_state_ids
@property
def context(self):
return self.interp.context
# Usage
config = {
"id": "deploy",
"initial": "building",
"context": {"version": "1.2.3", "status": "pending"},
"states": {
"building": {"on": {"BUILD_OK": "testing"}},
"testing": {"on": {"TESTS_PASS": "deploying", "TESTS_FAIL": "failed"}},
"deploying": {"on": {"DEPLOY_OK": "done"}},
"done": {"type": "final"},
"failed": {"type": "final"}
}
}
workflow = RecoverableWorkflow(config).start()
workflow.send("BUILD_OK")
print(f"State: {workflow.state}") # {'deploy.testing'}
# If the process crashes here, the next run will resume at 'testing'
workflow.send("TESTS_PASS")
workflow.send("DEPLOY_OK")
workflow.finish()