Advanced Patterns
Real-world architecture patterns — retry, wizard, e-commerce, and more.
Advanced Patterns
This guide presents production-ready state machine patterns that solve common architectural challenges. Each pattern includes complete, runnable code.
Pattern 1: Retry with Exponential Backoff
A service call that retries on failure with increasing delays. After exhausting retries, it transitions to a permanent failure state.
JSON Configuration
{
"id": "retryMachine",
"initial": "idle",
"context": {
"retries": 0,
"maxRetries": 3,
"lastError": null,
"result": null
},
"states": {
"idle": {
"on": { "START": "attempting" }
},
"attempting": {
"invoke": {
"src": "apiCall",
"onDone": {
"target": "success",
"actions": "storeResult"
},
"onError": [
{
"target": "waiting",
"guard": "canRetry",
"actions": "incrementRetry"
},
{
"target": "failed",
"actions": "storeError"
}
]
}
},
"waiting": {
"after": { "1000": "attempting" }
},
"success": { "type": "final" },
"failed": { "type": "final" }
}
}
Logic Implementation
from xstate_statemachine import create_machine, MachineLogic, SyncInterpreter
def can_retry(context, event):
"""Guard: allow retry if under the max retry count."""
return context["retries"] < context["maxRetries"]
def increment_retry(interpreter, context, event, action_def):
"""Action: increment the retry counter and record the error."""
context["retries"] += 1
context["lastError"] = str(event.data) if hasattr(event, "data") else None
print(f"Retry {context['retries']}/{context['maxRetries']}")
def store_result(interpreter, context, event, action_def):
"""Action: store the successful result."""
context["result"] = event.data if hasattr(event, "data") else None
print(f"Success: {context['result']}")
def store_error(interpreter, context, event, action_def):
"""Action: store the final error after all retries exhausted."""
context["lastError"] = str(event.data) if hasattr(event, "data") else None
print(f"Failed permanently: {context['lastError']}")
def api_call(interpreter, context, event):
"""Service: simulates an API call that may fail."""
import random
if random.random() < 0.7: # 70% chance of failure
raise ConnectionError("Server unavailable")
return {"status": "ok", "data": [1, 2, 3]}
logic = MachineLogic(
actions={
"incrementRetry": increment_retry,
"storeResult": store_result,
"storeError": store_error,
},
guards={"canRetry": can_retry},
services={"apiCall": api_call},
)
How it works: When apiCall fails, the onError transition checks canRetry. If retries remain, it moves to waiting, which uses an after timer to re-enter attempting after 1 second. If retries are exhausted, the second onError transition (without a guard) catches it and moves to failed.
Tip: For true exponential backoff, use multiple
afterentries with computed delays, or dynamically modify the delay in your action based oncontext["retries"].
Pattern 2: Form Wizard with Validation
A multi-step form where forward navigation requires validation, but backward navigation is always permitted.
Pythonic API Implementation
from xstate_statemachine import State, StateMachine, guard, action
class FormWizard(StateMachine):
machine_id = "wizard"
initial_context = {
"step1_data": {},
"step2_data": {},
"step3_data": {},
"errors": [],
}
# --- States ---
step1 = State("step1", initial=True)
step2 = State("step2")
step3 = State("step3")
review = State("review")
submitted = State("submitted", final=True)
# --- Forward transitions (guarded) ---
next_1_2 = step1.to(step2, event="NEXT", guard="isStep1Valid", actions=["saveStepData"])
next_2_3 = step2.to(step3, event="NEXT", guard="isStep2Valid", actions=["saveStepData"])
next_3_r = step3.to(review, event="NEXT", actions=["saveStepData"])
do_submit = review.to(submitted, event="SUBMIT")
# --- Back transitions (always allowed) ---
back_2_1 = step2.to(step1, event="BACK")
back_3_2 = step3.to(step2, event="BACK")
back_r_3 = review.to(step3, event="BACK")
# --- Guards ---
@guard("isStep1Valid")
def check_step1(self, context, event):
"""Step 1 requires a non-empty name."""
return bool(context["step1_data"].get("name"))
@guard("isStep2Valid")
def check_step2(self, context, event):
"""Step 2 requires a valid email."""
email = context["step2_data"].get("email", "")
return "@" in email and "." in email
# --- Actions ---
@action("saveStepData")
def save_step_data(self, interpreter, context, event, action_def):
"""Save the current step's form data from the event payload."""
if hasattr(event, "payload") and event.payload:
current_states = interpreter.current_state_ids
for state_id in current_states:
step_name = state_id.split(".")[-1]
key = f"{step_name}_data"
if key in context:
context[key].update(event.payload)
Running the Wizard
from xstate_statemachine import SyncInterpreter
machine = FormWizard.create_machine()
interp = SyncInterpreter(machine).start()
# Step 1: Fill in name
interp.context["step1_data"]["name"] = "Alice"
interp.send("NEXT") # step1 -> step2 (guard passes)
print(interp.current_state_ids) # {'wizard.step2'}
# Step 2: Fill in email
interp.context["step2_data"]["email"] = "alice@example.com"
interp.send("NEXT") # step2 -> step3 (guard passes)
# Go back
interp.send("BACK") # step3 -> step2 (always allowed)
print(interp.current_state_ids) # {'wizard.step2'}
# Go forward again
interp.send("NEXT") # step2 -> step3
interp.send("NEXT") # step3 -> review
interp.send("SUBMIT") # review -> submitted (final)
interp.stop()
Pattern 3: E-Commerce Checkout (Nested + Services + Guards)
A realistic checkout flow with nested states for the checkout process, a payment service, and guard conditions.
JSON Configuration
{
"id": "ecommerce",
"initial": "browsing",
"context": {
"cart": [],
"total": 0,
"shippingAddress": null,
"paymentMethod": null,
"receipt": null,
"error": null
},
"states": {
"browsing": {
"on": {
"ADD_TO_CART": { "actions": "addItem" },
"REMOVE_FROM_CART": { "actions": "removeItem" },
"CHECKOUT": { "target": "checkout", "guard": "cartNotEmpty" }
}
},
"checkout": {
"initial": "shipping",
"states": {
"shipping": {
"on": {
"SUBMIT_ADDRESS": {
"target": "payment",
"actions": "saveAddress"
}
}
},
"payment": {
"on": {
"SUBMIT_PAYMENT": {
"target": "processing",
"actions": "savePaymentMethod"
}
}
},
"processing": {
"invoke": {
"src": "chargeCard",
"onDone": {
"target": "confirmation",
"actions": "saveReceipt"
},
"onError": {
"target": "payment",
"actions": "showPaymentError"
}
}
},
"confirmation": { "type": "final" }
},
"on": { "CANCEL": "browsing" },
"onDone": "orderComplete"
},
"orderComplete": { "type": "final" }
}
}
Logic Implementation
from xstate_statemachine import create_machine, MachineLogic, SyncInterpreter
def add_item(interpreter, context, event, action_def):
item = event.payload.get("item", {})
context["cart"].append(item)
context["total"] += item.get("price", 0)
def remove_item(interpreter, context, event, action_def):
item_id = event.payload.get("id")
context["cart"] = [i for i in context["cart"] if i.get("id") != item_id]
context["total"] = sum(i.get("price", 0) for i in context["cart"])
def save_address(interpreter, context, event, action_def):
context["shippingAddress"] = event.payload.get("address")
def save_payment_method(interpreter, context, event, action_def):
context["paymentMethod"] = event.payload.get("method")
def save_receipt(interpreter, context, event, action_def):
context["receipt"] = event.data if hasattr(event, "data") else "OK"
def show_payment_error(interpreter, context, event, action_def):
context["error"] = str(event.data) if hasattr(event, "data") else "Unknown"
def cart_not_empty(context, event):
return len(context["cart"]) > 0
def charge_card(interpreter, context, event):
"""Simulate payment processing."""
if not context.get("paymentMethod"):
raise ValueError("No payment method provided")
return {
"transaction_id": "txn_abc123",
"amount": context["total"],
"status": "charged"
}
logic = MachineLogic(
actions={
"addItem": add_item,
"removeItem": remove_item,
"saveAddress": save_address,
"savePaymentMethod": save_payment_method,
"saveReceipt": save_receipt,
"showPaymentError": show_payment_error,
},
guards={"cartNotEmpty": cart_not_empty},
services={"chargeCard": charge_card},
)
# Run the checkout flow
import json
with open("ecommerce.json") as f:
config = json.load(f)
# Or define config inline (as shown above)
machine = create_machine(config, logic=logic)
interp = SyncInterpreter(machine).start()
# Add items
interp.send("ADD_TO_CART", item={"id": "w1", "name": "Widget", "price": 29.99})
interp.send("ADD_TO_CART", item={"id": "g1", "name": "Gadget", "price": 49.99})
# Start checkout
interp.send("CHECKOUT")
print(interp.current_state_ids) # {'ecommerce.checkout.shipping'}
# Fill shipping
interp.send("SUBMIT_ADDRESS", address="123 Main St")
# Fill payment and process
interp.send("SUBMIT_PAYMENT", method="visa_4242")
# chargeCard service runs synchronously -> confirmation (final) -> onDone -> orderComplete
print(interp.current_state_ids) # {'ecommerce.orderComplete'}
print(f"Receipt: {interp.context['receipt']}")
interp.stop()
Pattern 4: Authentication Flow
A hierarchical authentication system with session timeouts, token refresh, and protected sub-states.
{
"id": "auth",
"initial": "loggedOut",
"context": {
"user": null,
"token": null,
"sessionStart": null,
"error": null
},
"states": {
"loggedOut": {
"on": {
"LOGIN": "authenticating"
}
},
"authenticating": {
"invoke": {
"src": "authenticate",
"onDone": {
"target": "loggedIn",
"actions": "storeCredentials"
},
"onError": {
"target": "loggedOut",
"actions": "storeAuthError"
}
}
},
"loggedIn": {
"initial": "dashboard",
"entry": "recordSessionStart",
"states": {
"dashboard": {
"on": {
"GO_PROFILE": "profile",
"GO_SETTINGS": "settings"
}
},
"profile": {
"on": {
"GO_DASHBOARD": "dashboard",
"GO_SETTINGS": "settings"
}
},
"settings": {
"on": {
"GO_DASHBOARD": "dashboard",
"GO_PROFILE": "profile"
}
},
"refreshing": {
"invoke": {
"src": "refreshToken",
"onDone": {
"target": "dashboard",
"actions": "updateToken"
},
"onError": {
"target": "sessionExpired"
}
}
},
"sessionExpired": { "type": "final" }
},
"on": {
"LOGOUT": "loggedOut",
"REFRESH_TOKEN": ".refreshing"
},
"after": {
"3600000": ".refreshing"
},
"onDone": "loggedOut"
}
}
}
from xstate_statemachine import create_machine, MachineLogic
import time
logic = MachineLogic(
actions={
"storeCredentials": lambda i, ctx, e, a: ctx.update({
"user": e.data.get("user") if hasattr(e, "data") and isinstance(e.data, dict) else None,
"token": e.data.get("token") if hasattr(e, "data") and isinstance(e.data, dict) else None,
}),
"storeAuthError": lambda i, ctx, e, a: ctx.update({
"error": str(e.data) if hasattr(e, "data") else "Auth failed"
}),
"recordSessionStart": lambda i, ctx, e, a: ctx.update({
"sessionStart": time.time()
}),
"updateToken": lambda i, ctx, e, a: ctx.update({
"token": e.data.get("token") if hasattr(e, "data") and isinstance(e.data, dict) else ctx["token"]
}),
},
services={
"authenticate": lambda i, ctx, e: {
"user": "alice",
"token": "jwt_token_abc"
},
"refreshToken": lambda i, ctx, e: {
"token": f"jwt_refreshed_{int(time.time())}"
},
},
)
Key concepts in this pattern:
- Hierarchical states:
loggedInhas nested sub-states (dashboard,profile,settings) - Session timeout: The
afteronloggedIntriggers token refresh after 1 hour onDoneescalation: WhensessionExpired(final) is reached, theloggedInstate’sonDonetransitions back tologgedOut
Pattern 5: CI/CD Pipeline
A linear pipeline with error recovery and service invocations at each stage.
from xstate_statemachine import create_machine, MachineLogic, SyncInterpreter
config = {
"id": "cicd",
"initial": "checkout",
"context": {
"commit_sha": "abc123",
"build_artifact": None,
"test_results": None,
"deploy_url": None,
"errors": []
},
"states": {
"checkout": {
"invoke": {
"src": "gitCheckout",
"onDone": {"target": "build", "actions": "logStage"},
"onError": {"target": "failed", "actions": "recordError"}
}
},
"build": {
"invoke": {
"src": "buildProject",
"onDone": {"target": "test", "actions": ["saveBuildArtifact", "logStage"]},
"onError": {"target": "failed", "actions": "recordError"}
}
},
"test": {
"invoke": {
"src": "runTests",
"onDone": {"target": "approvalGate", "actions": ["saveTestResults", "logStage"]},
"onError": {"target": "failed", "actions": "recordError"}
}
},
"approvalGate": {
"on": {
"APPROVE": {"target": "deploy", "guard": "isApprover"},
"REJECT": "failed"
}
},
"deploy": {
"invoke": {
"src": "deployToProduction",
"onDone": {"target": "success", "actions": ["saveDeployUrl", "logStage"]},
"onError": {"target": "rollback", "actions": "recordError"}
}
},
"rollback": {
"invoke": {
"src": "rollbackDeploy",
"onDone": "failed",
"onError": "failed"
}
},
"success": {"type": "final"},
"failed": {"type": "final"}
}
}
logic = MachineLogic(
actions={
"logStage": lambda i, ctx, e, a: print(
f"Stage complete: {list(i.current_state_ids)}"
),
"saveBuildArtifact": lambda i, ctx, e, a: ctx.update(
{"build_artifact": e.data if hasattr(e, "data") else None}
),
"saveTestResults": lambda i, ctx, e, a: ctx.update(
{"test_results": e.data if hasattr(e, "data") else None}
),
"saveDeployUrl": lambda i, ctx, e, a: ctx.update(
{"deploy_url": e.data if hasattr(e, "data") else None}
),
"recordError": lambda i, ctx, e, a: ctx["errors"].append(
str(e.data) if hasattr(e, "data") else "unknown"
),
},
guards={
"isApprover": lambda ctx, e: e.payload.get("approver") in [
"alice", "bob", "charlie"
],
},
services={
"gitCheckout": lambda i, ctx, e: {"ref": ctx["commit_sha"]},
"buildProject": lambda i, ctx, e: {"artifact": "build-abc123.tar.gz"},
"runTests": lambda i, ctx, e: {"passed": 142, "failed": 0, "skipped": 3},
"deployToProduction": lambda i, ctx, e: {"url": "https://app.example.com"},
"rollbackDeploy": lambda i, ctx, e: {"status": "rolled_back"},
},
)
machine = create_machine(config, logic=logic)
interp = SyncInterpreter(machine).start()
# Pipeline runs automatically through checkout -> build -> test -> approvalGate
print(f"Waiting at: {interp.current_state_ids}")
# {'cicd.approvalGate'}
# Approve deployment
interp.send("APPROVE", approver="alice")
# deploy service runs -> success
print(f"Final state: {interp.current_state_ids}")
# {'cicd.success'}
print(f"Deploy URL: {interp.context['deploy_url']}")
# {'url': 'https://app.example.com'}
interp.stop()
Pattern 6: Traffic Light Controller
A timer-based state machine that cycles through light phases with an emergency override.
from xstate_statemachine import create_machine, MachineLogic, SyncInterpreter
config = {
"id": "trafficLight",
"initial": "green",
"context": {
"mode": "normal",
"cycle_count": 0
},
"states": {
"green": {
"entry": "logState",
"after": { "5000": "yellow" },
"on": { "EMERGENCY": "emergencyRed" }
},
"yellow": {
"entry": "logState",
"after": { "2000": "red" },
"on": { "EMERGENCY": "emergencyRed" }
},
"red": {
"entry": ["logState", "incrementCycle"],
"after": { "5000": "green" },
"on": { "EMERGENCY": "emergencyRed" }
},
"emergencyRed": {
"entry": "activateEmergency",
"on": {
"RESUME": {
"target": "red",
"actions": "deactivateEmergency"
}
}
}
}
}
logic = MachineLogic(
actions={
"logState": lambda i, ctx, e, a: print(
f"Light: {list(i.current_state_ids)[0].split('.')[-1].upper()}"
),
"incrementCycle": lambda i, ctx, e, a: ctx.update(
{"cycle_count": ctx["cycle_count"] + 1}
),
"activateEmergency": lambda i, ctx, e, a: (
ctx.update({"mode": "emergency"}),
print("EMERGENCY MODE ACTIVATED")
),
"deactivateEmergency": lambda i, ctx, e, a: (
ctx.update({"mode": "normal"}),
print("Returning to normal operation")
),
},
)
machine = create_machine(config, logic=logic)
interp = SyncInterpreter(machine).start()
# Light: GREEN
# Simulate emergency during green
interp.send("EMERGENCY")
# EMERGENCY MODE ACTIVATED
print(interp.current_state_ids) # {'trafficLight.emergencyRed'}
# Resume normal operation
interp.send("RESUME")
# Returning to normal operation
# Light: RED
interp.stop()
Testing State Machines
State machines are highly testable because their behavior is fully deterministic. Here are patterns for testing with pytest.
Testing with SyncInterpreter
import pytest
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
@pytest.fixture
def toggle_machine():
"""Create a simple toggle machine for testing."""
config = {
"id": "toggle",
"initial": "inactive",
"context": {"toggleCount": 0},
"states": {
"inactive": {
"on": {
"TOGGLE": {
"target": "active",
"actions": "incrementCount"
}
}
},
"active": {
"on": {
"TOGGLE": {
"target": "inactive",
"actions": "incrementCount"
}
}
}
}
}
logic = MachineLogic(
actions={
"incrementCount": lambda i, ctx, e, a: ctx.update(
{"toggleCount": ctx["toggleCount"] + 1}
)
}
)
return create_machine(config, logic=logic)
class TestToggleMachine:
"""Test suite for the toggle state machine."""
def test_initial_state(self, toggle_machine):
"""Machine should start in the 'inactive' state."""
interp = SyncInterpreter(toggle_machine).start()
assert interp.current_state_ids == {"toggle.inactive"}
assert interp.context["toggleCount"] == 0
interp.stop()
def test_single_toggle(self, toggle_machine):
"""TOGGLE event should move from inactive to active."""
interp = SyncInterpreter(toggle_machine).start()
interp.send("TOGGLE")
assert interp.current_state_ids == {"toggle.active"}
assert interp.context["toggleCount"] == 1
interp.stop()
def test_double_toggle(self, toggle_machine):
"""Two TOGGLE events should return to inactive."""
interp = SyncInterpreter(toggle_machine).start()
interp.send("TOGGLE")
interp.send("TOGGLE")
assert interp.current_state_ids == {"toggle.inactive"}
assert interp.context["toggleCount"] == 2
interp.stop()
def test_unhandled_event(self, toggle_machine):
"""Unknown events should not change state."""
interp = SyncInterpreter(toggle_machine).start()
interp.send("UNKNOWN_EVENT")
assert interp.current_state_ids == {"toggle.inactive"}
interp.stop()
def test_send_multiple_events(self, toggle_machine):
"""send_events should process a batch of events."""
interp = SyncInterpreter(toggle_machine).start()
interp.send_events(["TOGGLE", "TOGGLE", "TOGGLE"])
assert interp.current_state_ids == {"toggle.active"}
assert interp.context["toggleCount"] == 3
interp.stop()
Testing Guards
import pytest
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
@pytest.fixture
def guarded_machine():
config = {
"id": "guarded",
"initial": "locked",
"context": {"pin": "1234", "attempts": 0},
"states": {
"locked": {
"on": {
"ENTER_PIN": [
{
"target": "unlocked",
"guard": "pinCorrect"
},
{
"target": "locked",
"actions": "incrementAttempts"
}
]
}
},
"unlocked": {
"on": {"LOCK": "locked"}
}
}
}
logic = MachineLogic(
guards={
"pinCorrect": lambda ctx, e: e.payload.get("pin") == ctx["pin"]
},
actions={
"incrementAttempts": lambda i, ctx, e, a: ctx.update(
{"attempts": ctx["attempts"] + 1}
)
},
)
return create_machine(config, logic=logic)
class TestGuards:
def test_correct_pin_unlocks(self, guarded_machine):
interp = SyncInterpreter(guarded_machine).start()
interp.send("ENTER_PIN", pin="1234")
assert interp.current_state_ids == {"guarded.unlocked"}
interp.stop()
def test_wrong_pin_stays_locked(self, guarded_machine):
interp = SyncInterpreter(guarded_machine).start()
interp.send("ENTER_PIN", pin="0000")
assert interp.current_state_ids == {"guarded.locked"}
assert interp.context["attempts"] == 1
interp.stop()
def test_multiple_wrong_attempts(self, guarded_machine):
interp = SyncInterpreter(guarded_machine).start()
interp.send("ENTER_PIN", pin="0000")
interp.send("ENTER_PIN", pin="1111")
interp.send("ENTER_PIN", pin="9999")
assert interp.current_state_ids == {"guarded.locked"}
assert interp.context["attempts"] == 3
interp.stop()
Testing Actions
import pytest
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
def test_action_modifies_context():
"""Verify that actions correctly modify the context."""
config = {
"id": "cart",
"initial": "empty",
"context": {"items": [], "total": 0},
"states": {
"empty": {
"on": {
"ADD_ITEM": {
"target": "hasItems",
"actions": "addItem"
}
}
},
"hasItems": {
"on": {
"ADD_ITEM": {"actions": "addItem"},
"CLEAR": {
"target": "empty",
"actions": "clearCart"
}
}
}
}
}
def add_item(interpreter, context, event, action_def):
item = event.payload.get("item", {})
context["items"].append(item)
context["total"] += item.get("price", 0)
def clear_cart(interpreter, context, event, action_def):
context["items"] = []
context["total"] = 0
logic = MachineLogic(
actions={"addItem": add_item, "clearCart": clear_cart}
)
machine = create_machine(config, logic=logic)
interp = SyncInterpreter(machine).start()
# Add items
interp.send("ADD_ITEM", item={"name": "Widget", "price": 10.0})
assert len(interp.context["items"]) == 1
assert interp.context["total"] == 10.0
interp.send("ADD_ITEM", item={"name": "Gadget", "price": 20.0})
assert len(interp.context["items"]) == 2
assert interp.context["total"] == 30.0
# Clear cart
interp.send("CLEAR")
assert interp.current_state_ids == {"cart.empty"}
assert interp.context["items"] == []
assert interp.context["total"] == 0
interp.stop()
Testing Transitions with get_next_state
The MachineNode provides a get_next_state() utility for pure transition testing without running an interpreter:
from xstate_statemachine import create_machine, Event
config = {
"id": "nav",
"initial": "home",
"states": {
"home": {"on": {"GO_ABOUT": "about", "GO_CONTACT": "contact"}},
"about": {"on": {"GO_HOME": "home"}},
"contact": {"on": {"GO_HOME": "home"}}
}
}
machine = create_machine(config)
# Test transition targets without running an interpreter
assert machine.get_next_state("nav.home", Event("GO_ABOUT")) == {"nav.about"}
assert machine.get_next_state("nav.home", Event("GO_CONTACT")) == {"nav.contact"}
assert machine.get_next_state("nav.about", Event("GO_HOME")) == {"nav.home"}
assert machine.get_next_state("nav.home", Event("UNKNOWN")) is None
Note:
get_next_state()does not evaluate guards. It shows the potential transition target assuming all guards pass.
Testing with Snapshots
Use snapshots to jump to a specific state for targeted testing:
import json
from xstate_statemachine import create_machine, SyncInterpreter
def test_checkout_from_payment():
"""Test the payment -> processing transition directly."""
config = {
"id": "shop",
"initial": "browsing",
"context": {"cart": ["item1"], "paid": False},
"states": {
"browsing": {"on": {"CHECKOUT": "payment"}},
"payment": {
"on": {"PAY": {"target": "complete", "actions": "markPaid"}}
},
"complete": {"type": "final"}
}
}
from xstate_statemachine import MachineLogic
logic = MachineLogic(
actions={
"markPaid": lambda i, ctx, e, a: ctx.update({"paid": True})
}
)
# Jump directly to payment state
snapshot = json.dumps({
"status": "running",
"context": {"cart": ["item1", "item2"], "paid": False},
"state_ids": ["shop.payment"]
})
machine = create_machine(config, logic=logic)
interp = SyncInterpreter.from_snapshot(snapshot, machine)
assert interp.current_state_ids == {"shop.payment"}
assert len(interp.context["cart"]) == 2
interp.send("PAY")
assert interp.current_state_ids == {"shop.complete"}
assert interp.context["paid"] is True
interp.stop()
Running Tests
# Run all tests
pytest tests/ -v
# Run a specific test file
pytest tests/test_patterns.py -v
# Run with coverage
pytest tests/ --cov=xstate_statemachine --cov-report=term-missing