Delayed Transitions
Timer-based auto-transitions with after — timeouts, polling, and auto-progression.
Delayed transitions let a state automatically transition after a specified time delay. No event needed — the machine moves on its own when the timer expires. This is perfect for session timeouts, polling loops, auto-save debouncing, and any workflow that involves waiting.
What Are Delayed Transitions?
In a normal state machine, transitions only fire when an event arrives. Delayed transitions break that rule: they fire after a timer expires. You configure them with the after property on a state, mapping millisecond delays to target states.
5 min 30 sec
active ─────────► warning ──────────► expired
▲ │
└── ACTIVITY ──────┘ (user clicks EXTEND → back to active)
When the machine enters a state with after timers, those timers start immediately. If the machine leaves that state before a timer fires (because of an event), the timer is cancelled automatically.
JSON After Syntax
The after property is a dictionary mapping millisecond delays (as strings in JSON) to transition targets:
{
"after": {
"3000": "nextState"
}
}
After 3000 ms (3 seconds) in this state, the machine transitions to "nextState".
Basic Example: Session Timeout
A session that warns the user before expiring:
{
"id": "sessionTimeout",
"initial": "active",
"states": {
"active": {
"after": {
"300000": "warning"
},
"on": {
"ACTIVITY": "active"
}
},
"warning": {
"after": {
"30000": "expired"
},
"on": {
"EXTEND": "active"
}
},
"expired": {
"type": "final"
}
}
}
Running it:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "sessionTimeout",
"initial": "active",
"states": {
"active": {
"after": {"300000": "warning"},
"on": {"ACTIVITY": "active"}
},
"warning": {
"after": {"30000": "expired"},
"on": {"EXTEND": "active"}
},
"expired": {"type": "final"}
}
}
machine = create_machine(config)
interp = SyncInterpreter(machine).start()
print(interp.active_state_ids)
# {'sessionTimeout.active'}
# User does something — timer resets
interp.send("ACTIVITY")
print(interp.active_state_ids)
# {'sessionTimeout.active'} (timer restarted)
interp.stop()
This machine:
- Starts in
active - After 5 minutes of inactivity, moves to
warning - The user gets 30 seconds to click “EXTEND” or it moves to
expired - Any
ACTIVITYevent re-entersactive, resetting the 5-minute timer
After with Actions
Delayed transitions can trigger actions, just like event-driven transitions:
{
"monitoring": {
"after": {
"5000": {
"target": "next",
"actions": "logTimeout"
}
}
}
}
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "timedAction",
"initial": "waiting",
"states": {
"waiting": {
"after": {
"5000": {
"target": "done",
"actions": "logTimeout"
}
}
},
"done": {"type": "final"}
}
}
class TimedLogic(MachineLogic):
def logTimeout(self, interpreter, context, event, action_def):
print("Timer expired — transitioning to done")
machine = create_machine(config, logic=TimedLogic())
interp = SyncInterpreter(machine).start()
# After 5 seconds, "logTimeout" fires and machine moves to "done"
interp.stop()
After with Guards
You can conditionally block a delayed transition using a guard:
{
"monitoring": {
"after": {
"60000": {
"target": "stale",
"guard": "noRecentData"
}
}
}
}
If the guard noRecentData returns False when the timer fires, the transition is skipped — the machine stays in monitoring.
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "guardedTimer",
"initial": "monitoring",
"context": {"lastPing": 0},
"states": {
"monitoring": {
"after": {
"60000": {
"target": "stale",
"guard": "noRecentData"
}
},
"on": {
"PING": {"actions": "recordPing"}
}
},
"stale": {
"on": {"RESET": "monitoring"}
}
}
}
class MonitorLogic(MachineLogic):
def noRecentData(self, context, event):
"""Returns True if no PING has ever been received (lastPing is still 0)."""
return context.get("lastPing", 0) == 0
def recordPing(self, interpreter, context, event, action_def):
import time
context["lastPing"] = time.time()
machine = create_machine(config, logic=MonitorLogic())
interp = SyncInterpreter(machine).start()
# If PING arrives before 60s, lastPing becomes non-zero
# and the guard returns False, preventing the stale transition
interp.send("PING")
print(interp.active_state_ids)
# {'guardedTimer.monitoring'}
interp.stop()
Note: Guard functions receive
(context, event)and must return abool. They must be synchronous.
Multiple Timers per State
A single state can have multiple after timers running simultaneously:
{
"monitoring": {
"after": {
"5000": {"target": "monitoring", "actions": "heartbeat"},
"60000": {"target": "stale", "guard": "noRecentData"},
"300000": "timeout"
}
}
}
All three timers start when monitoring is entered:
- 5 seconds: self-transition with
heartbeataction (re-enters, restarting all timers) - 60 seconds: conditional move to
stale(only if guard passes) - 5 minutes: unconditional move to
timeout
Tip: The shortest timer fires first. If the 5-second heartbeat re-enters the state, all timers reset — so the 60s and 300s timers effectively restart too.
Timer Reset
When a state is re-entered (via an event transition or a self-transition), all after timers for that state are cancelled and restarted. This is key to implementing patterns like “idle timeout” — every user action resets the clock.
{
"active": {
"after": {
"300000": "warning"
},
"on": {
"ACTIVITY": "active"
}
}
}
Every ACTIVITY event re-enters active, which cancels the existing 5-minute timer and starts a fresh one.
Warning: Self-transitions (targeting the same state) will cause exit actions, timer cancellation, entry actions, and timer restart. This is intentional — it’s how XState works.
Pythonic After
The State class accepts an after parameter — a dict mapping millisecond delays to targets:
from xstate_statemachine import State, StateMachine, SyncInterpreter, action
class SessionMachine(StateMachine):
machine_id = "session"
active = State("active", initial=True, after={300000: "warning"},
on={"ACTIVITY": "active"})
warning = State("warning", after={30000: "expired"},
on={"EXTEND": "active"})
expired = State("expired", final=True)
machine = SessionMachine.create_machine()
interp = SyncInterpreter(machine).start()
print(interp.active_state_ids)
# {'session.active'}
interp.send("ACTIVITY")
print(interp.active_state_ids)
# {'session.active'}
interp.stop()
Using the functional API:
from xstate_statemachine import State, build_machine, SyncInterpreter
active = State("active", initial=True, after={300000: "warning"},
on={"ACTIVITY": "active"})
warning = State("warning", after={30000: "expired"},
on={"EXTEND": "active"})
expired = State("expired", final=True)
machine = build_machine(id="session", states=[active, warning, expired])
interp = SyncInterpreter(machine).start()
interp.stop()
Using the builder API:
from xstate_statemachine import MachineBuilder, SyncInterpreter
machine = (
MachineBuilder("session")
.state("active", initial=True, after={300000: "warning"},
on={"ACTIVITY": "active"})
.state("warning", after={30000: "expired"},
on={"EXTEND": "active"})
.state("expired", final=True)
.build()
)
interp = SyncInterpreter(machine).start()
interp.stop()
After in Nested States
Delayed transitions work inside compound (hierarchical) states. Timers in a child state are cancelled when the parent state is exited:
{
"id": "nestedTimer",
"initial": "loggedIn",
"states": {
"loggedIn": {
"initial": "dashboard",
"states": {
"dashboard": {
"after": {
"60000": {"target": "dashboard", "actions": "refreshData"}
}
},
"settings": {}
},
"on": {
"LOGOUT": "loggedOut"
}
},
"loggedOut": {}
}
}
When LOGOUT fires, the machine exits dashboard (cancelling its 60-second refresh timer) and then exits loggedIn.
Sync vs Async Timers
| Interpreter | Timer Behavior |
|---|---|
Interpreter (async) |
Uses asyncio.create_task / asyncio.sleep — timers run concurrently with your event loop |
SyncInterpreter |
Timers are scheduled and processed on send() calls — they fire when you next interact with the machine |
Tip: For real-time timer behavior (actual wall-clock delays), use the async
Interpreterwithasyncio. TheSyncInterpreteris best for testing and non-real-time workflows.
Complete Example: Polling Machine
A machine that polls an API at regular intervals, with error handling and backoff:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "poller",
"initial": "idle",
"context": {"data": None, "errors": 0, "maxErrors": 3},
"states": {
"idle": {
"on": {"START": "polling"}
},
"polling": {
"invoke": {
"src": "fetchData",
"onDone": {
"target": "waiting",
"actions": "storeData"
},
"onError": {
"target": "retrying",
"actions": "incrementErrors"
}
}
},
"waiting": {
"after": {
"10000": "polling"
},
"on": {"STOP": "idle"}
},
"retrying": {
"after": {
"30000": {
"target": "polling",
"guard": "belowMaxErrors"
},
"30001": {
"target": "failed",
"guard": "atMaxErrors"
}
}
},
"failed": {
"on": {"RESET": "idle"}
}
}
}
class PollerLogic(MachineLogic):
def fetchData(self, interpreter, context, event):
return {"status": "ok", "value": 42}
def storeData(self, interpreter, context, event, action_def):
context["data"] = event.data
context["errors"] = 0
def incrementErrors(self, interpreter, context, event, action_def):
context["errors"] += 1
def belowMaxErrors(self, context, event):
return context["errors"] < context["maxErrors"]
def atMaxErrors(self, context, event):
return context["errors"] >= context["maxErrors"]
machine = create_machine(config, logic=PollerLogic())
interp = SyncInterpreter(machine).start()
interp.send("START")
print(interp.active_state_ids)
# {'poller.waiting'} (fetchData succeeded, now waiting 10s to poll again)
print(interp.context["data"])
# {'status': 'ok', 'value': 42}
interp.send("STOP")
print(interp.active_state_ids)
# {'poller.idle'}
interp.stop()
Complete Example: Auto-Save with Debounce
A document editor that auto-saves 2 seconds after the last edit:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "autoSave",
"initial": "clean",
"context": {"content": "", "lastSaved": None},
"states": {
"clean": {
"on": {
"EDIT": {
"target": "dirty",
"actions": "updateContent"
}
}
},
"dirty": {
"after": {
"2000": "saving"
},
"on": {
"EDIT": {
"target": "dirty",
"actions": "updateContent"
}
}
},
"saving": {
"invoke": {
"src": "saveDocument",
"onDone": {
"target": "clean",
"actions": "markSaved"
},
"onError": {
"target": "dirty"
}
}
}
}
}
class AutoSaveLogic(MachineLogic):
def updateContent(self, interpreter, context, event, action_def):
context["content"] = event.data.get("text", context["content"])
print(f"Content updated: {context['content']!r}")
def saveDocument(self, interpreter, context, event):
print(f"Saving: {context['content']!r}")
return {"saved": True}
def markSaved(self, interpreter, context, event, action_def):
import time
context["lastSaved"] = time.time()
print("Document saved!")
machine = create_machine(config, logic=AutoSaveLogic())
interp = SyncInterpreter(machine).start()
# Type something
interp.send({"type": "EDIT", "text": "Hello"})
print(interp.active_state_ids)
# {'autoSave.dirty'}
# Type more — this resets the 2-second debounce timer
interp.send({"type": "EDIT", "text": "Hello, world!"})
print(interp.active_state_ids)
# {'autoSave.dirty'} (timer restarted)
# After 2 seconds of no edits, auto-save would fire
interp.stop()
The key insight: each EDIT event re-enters dirty, which resets the 2-second timer. The save only fires after the user stops typing for 2 full seconds. This is the classic debounce pattern, expressed declaratively.