Interpreters
Execute state machines with async or sync interpreters — complete guide.
An interpreter is the runtime engine that executes a state machine. It processes events, evaluates guards, runs actions, invokes services, and manages state transitions. Without an interpreter, a machine definition is just data.
Think of it this way:
- A machine (
MachineNode) is the blueprint — it defines states, transitions, and rules. - An interpreter is the engine — it brings the blueprint to life and tracks the current state.
What Is an Interpreter?
The interpreter manages the full lifecycle of a running state machine:
- Start — enters the initial state (runs entry actions).
- Receive events — matches events against transition rules.
- Evaluate guards — checks boolean conditions on transitions.
- Execute actions — runs side effects (entry, exit, and transition actions).
- Invoke services — starts async/sync operations and handles their results.
- Track state — maintains the current state(s) and context.
- Stop — exits all active states (runs exit actions) and shuts down.
This library provides two interpreter implementations:
| Interpreter | Import | Event Loop | Use Case |
|---|---|---|---|
Interpreter |
from xstate_statemachine import Interpreter |
asyncio |
Web servers, async frameworks, async services |
SyncInterpreter |
from xstate_statemachine import SyncInterpreter |
None | Scripts, CLI tools, Django views, testing |
Both interpreters share the same API surface — the only difference is async/await vs. synchronous calls.
Async Interpreter
Use Interpreter for asyncio-based applications — web servers (FastAPI, aiohttp), async background workers, or any codebase built on async/await.
Full Example
import asyncio
from xstate_statemachine import create_machine, Interpreter, MachineLogic
config = {
"id": "fetchMachine",
"initial": "idle",
"context": {"data": None, "error": None},
"states": {
"idle": {
"on": {"FETCH": "loading"}
},
"loading": {
"invoke": {
"src": "fetchData",
"onDone": {"target": "success", "actions": "storeData"},
"onError": {"target": "error", "actions": "storeError"}
}
},
"success": {
"on": {"REFRESH": "loading", "RESET": "idle"}
},
"error": {
"on": {"RETRY": "loading", "RESET": "idle"}
}
}
}
class FetchLogic(MachineLogic):
async def fetchData(self, interpreter, context, event):
import aiohttp
async with aiohttp.ClientSession() as session:
resp = await session.get("https://api.example.com/data")
return await resp.json()
def storeData(self, interpreter, context, event, action_def):
context["data"] = event.data
def storeError(self, interpreter, context, event, action_def):
context["error"] = str(event.data)
async def main():
machine = create_machine(config, logic=FetchLogic())
interpreter = Interpreter(machine)
await interpreter.start()
print(interpreter.active_state_ids)
# {'fetchMachine.idle'}
await interpreter.send("FETCH")
# idle → loading → (service runs) → success or error
print(interpreter.active_state_ids)
print(interpreter.context)
await interpreter.stop()
asyncio.run(main())
Key Points
await interpreter.start()— enters the initial state and returns the interpreter (for chaining).await interpreter.send("EVENT")— sends an event and processes the resulting transition(s).await interpreter.stop()— exits all active states and shuts down.- Services defined with
async defare awaited automatically.
Use Cases
- FastAPI / Starlette route handlers
- aiohttp web servers
- Celery async tasks
- WebSocket connection state management
- Any
asyncio.run()orasync defcontext
Sync Interpreter
Use SyncInterpreter for synchronous code — scripts, CLI tools, Django views, Flask handlers, or test suites.
Full Example
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "toggleMachine",
"initial": "inactive",
"context": {"toggles": 0},
"states": {
"inactive": {
"on": {"ACTIVATE": {"target": "active", "actions": "logToggle"}}
},
"active": {
"on": {"DEACTIVATE": {"target": "inactive", "actions": "logToggle"}}
}
}
}
class ToggleLogic(MachineLogic):
def logToggle(self, interpreter, context, event, action_def):
context["toggles"] += 1
print(f"Toggle #{context['toggles']} → {interpreter.active_state_ids}")
machine = create_machine(config, logic=ToggleLogic())
# .start() is chainable — returns the interpreter
interp = SyncInterpreter(machine).start()
print(interp.active_state_ids)
# {'toggleMachine.inactive'}
interp.send("ACTIVATE")
# Toggle #1 → {'toggleMachine.active'}
interp.send("DEACTIVATE")
# Toggle #2 → {'toggleMachine.inactive'}
print(interp.context)
# {'toggles': 2}
interp.stop()
Key Points
.start()returnsself, so you can chain:SyncInterpreter(machine).start()..send("EVENT")processes the event synchronously and blocks until all actions complete..stop()exits all active states and runs their exit actions.- Services defined with
def(notasync def) are called directly.
Use Cases
- Scripts and CLI tools
- Django views and middleware
- Flask route handlers
- pytest test suites
- Quick prototyping and REPL exploration
Key Properties and Methods
| Property / Method | Type | Description |
|---|---|---|
.start() |
method | Initialize the interpreter and enter the initial state. Runs entry actions. Returns the interpreter (chainable). |
.stop() |
method | Exit all active states (runs exit actions) and shut down. |
.send(event, **kwargs) |
method | Send an event to the machine. Accepts string, dict, or Event object. Extra kwargs become event payload. |
.send_events(events) |
method | Send multiple events in sequence. Each event is processed before the next. |
.active_state_ids |
set[str] |
The set of currently active state IDs (e.g., {'machine.idle'}). |
.context |
dict |
The current machine context. Mutable — actions can modify this directly. |
.is_running |
bool |
True after .start(), False after .stop(). |
.plugins |
list |
List of attached plugin instances. Set before .start(). |
Sending Events — All Formats
The .send() method accepts events in multiple formats. Use whichever is most convenient:
String Shorthand
The simplest form — just the event name:
interp.send("CLICK")
interp.send("SUBMIT")
interp.send("TIMER")
With Payload (Keyword Arguments)
Pass extra data as keyword arguments. These become accessible in actions and guards via event.payload:
interp.send("LOGIN", username="alice", password="secret")
interp.send("UPDATE_PROFILE", name="Alice", email="alice@example.com")
interp.send("ADD_ITEM", product_id=42, quantity=3)
Event Object
Create an Event instance directly:
from xstate_statemachine import Event
event = Event(type="LOGIN", payload={"username": "alice", "password": "secret"})
interp.send(event)
Dict Form
Pass a dict with a type key:
interp.send({"type": "LOGIN", "username": "alice", "password": "secret"})
interp.send({"type": "ADD_ITEM", "product_id": 42, "quantity": 3})
Multiple Events at Once
Send a list of events with send_events(). Each event is fully processed (including all resulting transitions, actions, and services) before the next one starts:
interp.send_events(["STEP_1", "STEP_2", "STEP_3"])
# Equivalent to:
# interp.send("STEP_1")
# interp.send("STEP_2")
# interp.send("STEP_3")
You can mix formats in the list:
interp.send_events([
"START",
{"type": "CONFIG", "mode": "advanced"},
Event(type="READY", payload={}),
])
Event Payloads — Accessing Event Data
When you send an event with payload data, actions and guards can access it through the event parameter:
from xstate_statemachine import (
create_machine, SyncInterpreter, MachineLogic
)
config = {
"id": "userMachine",
"initial": "idle",
"context": {"user": None},
"states": {
"idle": {
"on": {"LOGIN": {"target": "loggedIn", "actions": "storeUser"}}
},
"loggedIn": {
"on": {"LOGOUT": {"target": "idle", "actions": "clearUser"}}
}
}
}
class UserLogic(MachineLogic):
def storeUser(self, interpreter, context, event, action_def):
# Access payload data from keyword arguments
context["user"] = {
"username": event.payload.get("username"),
"role": event.payload.get("role", "user"),
}
print(f"Logged in as {context['user']['username']}")
def clearUser(self, interpreter, context, event, action_def):
context["user"] = None
machine = create_machine(config, logic=UserLogic())
interp = SyncInterpreter(machine).start()
interp.send("LOGIN", username="alice", role="admin")
# Output: Logged in as alice
print(interp.context["user"])
# {'username': 'alice', 'role': 'admin'}
interp.stop()
Note: For service
onDoneevents, the service’s return value is available asevent.data. ForonErrorevents, the raised exception is inevent.data.
Interpreter Lifecycle
The interpreter follows a strict lifecycle:
create start() send() stop()
───────────► [Created] ─────► [Running] ─────► [Stopped]
│ ▲
│ │
└────┘
send() / process events
Step-by-Step
from xstate_statemachine import create_machine, SyncInterpreter
config = {
"id": "lifecycle",
"initial": "idle",
"context": {},
"states": {
"idle": {"on": {"GO": "running"}, "entry": "onEnterIdle"},
"running": {"on": {"STOP": "done"}, "entry": "onEnterRunning"},
"done": {"type": "final", "entry": "onEnterDone"}
}
}
machine = create_machine(config)
# 1. CREATE — machine is defined but not running
interp = SyncInterpreter(machine)
print(interp.is_running) # False
print(interp.active_state_ids) # set()
# 2. START — enters initial state, runs entry actions
interp.start()
print(interp.is_running) # True
print(interp.active_state_ids) # {'lifecycle.idle'}
# 3. SEND EVENTS — transitions occur
interp.send("GO")
print(interp.active_state_ids) # {'lifecycle.running'}
interp.send("STOP")
print(interp.active_state_ids) # {'lifecycle.done'}
# 4. STOP — exits all states, runs exit actions
interp.stop()
print(interp.is_running) # False
Idempotency and Safety
Both start() and stop() are safe to call multiple times:
start()on an already-running interpreter is a no-opstop()on an already-stopped interpreter is a no-opsend()on a stopped interpreter is silently ignored
interp = SyncInterpreter(machine).start()
interp.start() # No effect — already running
interp.stop()
interp.stop() # No effect — already stopped
interp.send("EVENT") # Silently ignored — interpreter is stopped
Event Processing: Queue Semantics
The async Interpreter uses an internal event queue. When you call await interp.send("EVENT"), the event is placed on the queue and processed by a background event loop. This means:
- One
send()call may trigger multiple transitions if the target state hasalways(eventless) transitions - After processing an event, the interpreter automatically checks for and processes any matching
alwaystransitions until no more apply - Timer-based
aftertransitions are scheduled as background tasks
The SyncInterpreter processes events immediately and synchronously within the send() call — there is no background queue.
Automatic (Eventless) Transitions
When a state has always transitions, the interpreter evaluates them immediately after entering the state — no event needed:
config = {
"id": "autoRouter",
"initial": "checking",
"context": {"role": "admin"},
"states": {
"checking": {
"always": [
{"target": "adminPanel", "guard": "isAdmin"},
{"target": "userDashboard"}
]
},
"adminPanel": {},
"userDashboard": {}
}
}
class Logic(MachineLogic):
def isAdmin(self, context, event):
return context.get("role") == "admin"
machine = create_machine(config, logic=Logic())
interp = SyncInterpreter(machine).start()
# No send() needed — the machine automatically transitions
# through 'checking' into 'adminPanel' via the always transition
print(interp.active_state_ids)
# {'autoRouter.adminPanel'}
interp.stop()
Note: A single
send()call can trigger a chain of transitions if states along the path havealwaystransitions. The interpreter keeps processing until it reaches a stable state with no pending eventless transitions.
Plugin Attachment
Plugins observe machine execution without modifying behavior. Attach them before calling .start():
from xstate_statemachine import (
create_machine, SyncInterpreter, LoggingInspector
)
config = {
"id": "demo",
"initial": "a",
"states": {
"a": {"on": {"GO": "b"}},
"b": {"on": {"GO": "c"}},
"c": {"type": "final"}
}
}
machine = create_machine(config)
interp = SyncInterpreter(machine)
# Attach plugins BEFORE starting
interp.plugins = [LoggingInspector()]
interp.start()
interp.send("GO") # a → b — logged by plugin
interp.send("GO") # b → c — logged by plugin
interp.stop()
Output:
🕵️ [INSPECT] Transition: ['demo.a'] -> ['demo.b'] on Event 'GO'
🕵️ [INSPECT] New Context: {}
🕵️ [INSPECT] Transition: ['demo.b'] -> ['demo.c'] on Event 'GO'
🕵️ [INSPECT] New Context: {}
Custom Plugin Example
from xstate_statemachine import PluginBase
class MetricsPlugin(PluginBase):
def __init__(self):
self.transition_count = 0
self.events_received = []
def on_transition(self, interpreter, from_states, to_states, transition):
self.transition_count += 1
def on_event_received(self, interpreter, event):
self.events_received.append(event.type)
metrics = MetricsPlugin()
interp.plugins = [metrics, LoggingInspector()] # Multiple plugins
interp.start()
# ... use the machine ...
print(f"Total transitions: {metrics.transition_count}")
Using with the Pythonic API
The interpreters work identically with machines built using the Pythonic API:
from xstate_statemachine import (
State, StateMachine, SyncInterpreter,
action, guard, LoggingInspector
)
class OrderMachine(StateMachine):
machine_id = "order"
initial_context = {"items": [], "total": 0}
cart = State("cart", initial=True)
checkout = State("checkout")
confirmed = State("confirmed", final=True)
begin_checkout = cart.to(checkout, event="CHECKOUT", guard="hasItems")
confirm = checkout.to(confirmed, event="CONFIRM")
back_to_cart = checkout.to(cart, event="BACK")
@guard
def has_items(self, context, event):
return len(context.get("items", [])) > 0
@action
def add_item(self, interpreter, context, event, action_def):
item = event.payload.get("item", "unknown")
context["items"].append(item)
add = cart.internal("ADD_ITEM", actions=["addItem"])
# Build and run
machine = OrderMachine.create_machine()
interp = SyncInterpreter(machine)
interp.plugins = [LoggingInspector()]
interp.start()
interp.send("ADD_ITEM", item="Widget")
interp.send("ADD_ITEM", item="Gadget")
interp.send("CHECKOUT")
print(interp.active_state_ids)
# {'order.checkout'}
interp.send("CONFIRM")
print(interp.context["items"])
# ['Widget', 'Gadget']
interp.stop()
Error Handling During Event Processing
If an action raises an exception, the interpreter propagates it. Wrap .send() calls in try/except for graceful error handling:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "risky",
"initial": "idle",
"states": {
"idle": {"on": {"GO": {"target": "processing", "actions": "riskyAction"}}},
"processing": {},
"error": {}
}
}
class RiskyLogic(MachineLogic):
def riskyAction(self, interpreter, context, event, action_def):
raise ValueError("Something went wrong!")
machine = create_machine(config, logic=RiskyLogic())
interp = SyncInterpreter(machine).start()
try:
interp.send("GO")
except ValueError as e:
print(f"Caught error: {e}")
# Handle the error — the machine state depends on
# when the error occurred during transition processing
interp.stop()
Tip: For expected errors (like network failures), use
invoke/onErrorinstead of try/except. TheonErrortransition is the idiomatic way to handle service errors in state machines.
Testing with SyncInterpreter
SyncInterpreter is ideal for testing — no async boilerplate, no event loops:
import pytest
from xstate_statemachine import (
State, StateMachine, SyncInterpreter, guard, action
)
class LoginMachine(StateMachine):
machine_id = "login"
initial_context = {"username": "", "authenticated": False}
idle = State("idle", initial=True)
authenticating = State("authenticating")
logged_in = State("loggedIn")
error = State("error")
attempt = idle.to(authenticating, event="LOGIN")
success = authenticating.to(logged_in, event="SUCCESS", actions=["setAuthenticated"])
failure = authenticating.to(error, event="FAILURE")
retry = error.to(idle, event="RETRY")
logout = logged_in.to(idle, event="LOGOUT", actions=["clearAuth"])
@action
def set_authenticated(self, interpreter, context, event, action_def):
context["authenticated"] = True
context["username"] = event.payload.get("username", "")
@action
def clear_auth(self, interpreter, context, event, action_def):
context["authenticated"] = False
context["username"] = ""
class TestLoginMachine:
def setup_method(self):
"""Create a fresh interpreter for each test."""
machine = LoginMachine.create_machine()
self.interp = SyncInterpreter(machine).start()
def teardown_method(self):
"""Clean up the interpreter."""
self.interp.stop()
def test_starts_in_idle(self):
assert "login.idle" in self.interp.active_state_ids
def test_login_flow(self):
self.interp.send("LOGIN")
assert "login.authenticating" in self.interp.active_state_ids
self.interp.send("SUCCESS", username="alice")
assert "login.loggedIn" in self.interp.active_state_ids
assert self.interp.context["authenticated"] is True
assert self.interp.context["username"] == "alice"
def test_login_failure_and_retry(self):
self.interp.send("LOGIN")
self.interp.send("FAILURE")
assert "login.error" in self.interp.active_state_ids
self.interp.send("RETRY")
assert "login.idle" in self.interp.active_state_ids
def test_logout_clears_auth(self):
self.interp.send("LOGIN")
self.interp.send("SUCCESS", username="bob")
assert self.interp.context["authenticated"] is True
self.interp.send("LOGOUT")
assert "login.idle" in self.interp.active_state_ids
assert self.interp.context["authenticated"] is False
assert self.interp.context["username"] == ""
def test_ignores_invalid_events(self):
"""Events that don't match any transition are silently ignored."""
self.interp.send("NONEXISTENT_EVENT")
assert "login.idle" in self.interp.active_state_ids
def test_send_events_batch(self):
"""send_events processes events in sequence."""
self.interp.send_events(["LOGIN", "FAILURE", "RETRY"])
assert "login.idle" in self.interp.active_state_ids
Run tests with:
pytest test_login.py -v
Async vs. Sync Comparison
| Feature | Interpreter (async) |
SyncInterpreter (sync) |
|---|---|---|
| Start | await interp.start() |
interp.start() |
| Send | await interp.send("E") |
interp.send("E") |
| Stop | await interp.stop() |
interp.stop() |
| Chainable start | await Interpreter(m).start() |
SyncInterpreter(m).start() |
| Services | async def (awaited) |
def (called directly) |
| Actions | Sync (same for both) | Sync (same for both) |
| Guards | Sync (same for both) | Sync (same for both) |
| Event loop | Requires asyncio |
No event loop needed |
| Plugins | Same API | Same API |
| Context access | interp.context |
interp.context |
| Active states | interp.active_state_ids |
interp.active_state_ids |
| Thread safety | Single-threaded (asyncio) | Single-threaded |
Tip: Use
SyncInterpreterfor testing even if your production code usesInterpreter. It eliminates async boilerplate in tests and makes assertions straightforward.
Tip: If you’re unsure which to use, start with
SyncInterpreter. You can always switch toInterpreterlater — the machine definition doesn’t change, only the interpreter and service implementations.