Services & Invoke
Async operations — API calls, database queries, and external integrations.
Services represent external operations — API calls, database queries, file reads, computations — that a state invokes when it is entered. When the service completes, the machine automatically transitions via onDone; if it fails, the machine transitions via onError.
What are Services?
A service is a callable that runs when a state is entered and produces a result (or an error). Services bridge the gap between your state machine’s declarative flow and the imperative world of I/O operations.
Key characteristics:
- Invoked by states — services are tied to a state’s
invokeproperty, not to transitions. - Result-driven — the return value becomes
event.datain theonDonehandler. - Error-aware — exceptions are caught and routed to
onErrorhandlers. - Sync or async —
SyncInterpreterrequires sync services;Interpretersupports async.
JSON Invoke Structure
The invoke property on a state defines which service to call and how to handle the result:
{
"invoke": {
"src": "fetchUser",
"id": "userFetcher",
"onDone": {
"target": "loaded",
"actions": "storeUser"
},
"onError": {
"target": "error",
"actions": "storeError"
}
}
}
| Key | Type | Description |
|---|---|---|
src |
string |
The service name (must match a registered service function) |
id |
string |
Optional unique identifier (defaults to the state’s ID) |
onDone |
object |
Transition to take on successful completion |
onError |
object |
Transition to take on failure (exception) |
Basic Invoke Example
A complete example of a user-loading machine:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "userLoader",
"initial": "idle",
"context": {"user": None, "error": None},
"states": {
"idle": {
"on": {"LOAD": "loading"}
},
"loading": {
"invoke": {
"src": "fetchUser",
"onDone": {
"target": "loaded",
"actions": "storeUser"
},
"onError": {
"target": "error",
"actions": "storeError"
}
}
},
"loaded": {"type": "final"},
"error": {"on": {"RETRY": "loading"}}
}
}
class UserLogic(MachineLogic):
def fetchUser(self, interpreter, context, event):
# Simulate an API call (sync version)
return {"id": 1, "name": "Alice", "email": "alice@example.com"}
def storeUser(self, interpreter, context, event, action_def):
context["user"] = event.data
def storeError(self, interpreter, context, event, action_def):
context["error"] = str(event.data)
machine = create_machine(config, logic=UserLogic())
interp = SyncInterpreter(machine).start()
interp.send("LOAD")
print(interp.context["user"])
# {"id": 1, "name": "Alice", "email": "alice@example.com"}
print(interp.current_state_ids)
# {"userLoader.loaded"}
interp.stop()
Service Implementation
Service Signature
def my_service(interpreter, context, event) -> Any:
...
| Parameter | Type | Description |
|---|---|---|
interpreter |
SyncInterpreter or Interpreter |
The running interpreter instance |
context |
dict |
The machine’s context (read-only recommended) |
event |
Event |
A synthetic event with invoke metadata |
Note: The service signature
(interpreter, context, event)differs from the action signature(interpreter, context, event, action_def)— services do not receiveaction_def.
Sync Service (with requests)
For use with SyncInterpreter:
class UserLogicSync(MachineLogic):
def fetchUser(self, interpreter, context, event):
import requests
user_id = context.get("userId", 1)
resp = requests.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
resp.raise_for_status()
return resp.json()
Async Service (with aiohttp)
For use with the async Interpreter:
class UserLogicAsync(MachineLogic):
async def fetchUser(self, interpreter, context, event):
import aiohttp
user_id = context.get("userId", 1)
async with aiohttp.ClientSession() as session:
resp = await session.get(
f"https://jsonplaceholder.typicode.com/users/{user_id}"
)
return await resp.json()
Warning: The
SyncInterpreterraisesNotSupportedErrorif you attempt to invoke anasync defservice. Use the asyncInterpreterfor async services.
onDone Handling
When a service completes successfully, the interpreter:
- Wraps the return value in a
DoneEventwithtype="done.invoke.<id>" - Sends that event to the machine
- The machine matches it against the
onDonetransition
Target State and Actions
"onDone": {
"target": "loaded",
"actions": "storeUser"
}
The onDone transition can include both a target state and actions — just like any other transition:
"onDone": {
"target": "loaded",
"actions": ["storeUser", "logSuccess", "clearLoading"]
}
onError Handling
When a service raises an exception, the interpreter:
- Catches the exception
- Wraps it in a
DoneEventwithtype="error.platform.<id>" - Sends that event to the machine
- The machine matches it against the
onErrortransition
Error State and Error Actions
"onError": {
"target": "error",
"actions": "storeError"
}
Accessing Service Results
In onDone actions, the service’s return value is available on event.data:
class Logic(MachineLogic):
def fetchUser(self, interpreter, context, event):
return {"id": 1, "name": "Alice", "role": "admin"}
def storeUser(self, interpreter, context, event, action_def):
# event.data is the return value from fetchUser
user = event.data
context["user"] = user
context["userName"] = user["name"]
print(f"Loaded user: {user['name']}")
Accessing Error Info
In onError actions, the exception object is available on event.data:
class Logic(MachineLogic):
def fetchUser(self, interpreter, context, event):
raise ConnectionError("API server unreachable")
def storeError(self, interpreter, context, event, action_def):
# event.data is the exception object
error = event.data
context["error"] = str(error)
context["errorType"] = type(error).__name__
print(f"Service failed: {error}")
# Output: Service failed: API server unreachable
Multiple Services (Array Form)
A state can invoke multiple services simultaneously by using an array:
{
"loading": {
"invoke": [
{
"src": "fetchUser",
"id": "userService",
"onDone": { "actions": "storeUser" },
"onError": { "actions": "storeUserError" }
},
{
"src": "fetchOrders",
"id": "ordersService",
"onDone": { "actions": "storeOrders" },
"onError": { "actions": "storeOrdersError" }
}
]
}
}
Note: Each invoke in the array needs a unique
idto distinguish itsonDone/onErrorevents. If noidis provided, it defaults to the state’s ID, which would cause collisions when multiple services are invoked.
class DataLogic(MachineLogic):
def fetchUser(self, interpreter, context, event):
return {"name": "Alice"}
def fetchOrders(self, interpreter, context, event):
return [{"id": 1, "total": 29.99}]
def storeUser(self, interpreter, context, event, action_def):
context["user"] = event.data
def storeOrders(self, interpreter, context, event, action_def):
context["orders"] = event.data
def storeUserError(self, interpreter, context, event, action_def):
context["userError"] = str(event.data)
def storeOrdersError(self, interpreter, context, event, action_def):
context["ordersError"] = str(event.data)
Service with Guards
You can add guards to onDone transitions to route based on the service result:
{
"loading": {
"invoke": {
"src": "fetchUser",
"onDone": [
{ "target": "adminDashboard", "guard": "isAdmin", "actions": "storeUser" },
{ "target": "userDashboard", "actions": "storeUser" }
],
"onError": {
"target": "error"
}
}
}
}
class Logic(MachineLogic):
def fetchUser(self, interpreter, context, event):
return {"name": "Alice", "role": "admin"}
def storeUser(self, interpreter, context, event, action_def):
context["user"] = event.data
def isAdmin(self, context, event):
# event.data holds the service result
user = event.data
return isinstance(user, dict) and user.get("role") == "admin"
Invoke with Timeout
Combine invoke with after to implement service timeouts. If the service doesn’t complete before the timer fires, the machine transitions to a timeout state:
{
"loading": {
"invoke": {
"src": "fetchUser",
"onDone": { "target": "loaded", "actions": "storeUser" },
"onError": { "target": "error" }
},
"after": {
"5000": { "target": "timeout" }
}
}
}
Tip: The
aftertimer is cancelled when the state is exited (e.g., whenonDonefires first), so there is no conflict between the two.
Pythonic Services
@service Decorator
The @service decorator marks a function as a service:
from xstate_statemachine import service
@service
def fetch_user(interpreter, context, event):
return {"id": 1, "name": "Alice"}
# Registered as "fetchUser" (auto snake_case → camelCase)
Explicit Naming
@service("loadUserProfile")
def get_user(interpreter, context, event):
return {"id": 1, "name": "Alice"}
# Registered as "loadUserProfile"
Service in StateMachine Class
from xstate_statemachine import State, StateMachine, SyncInterpreter, service, action
class UserLoader(StateMachine):
machine_id = "userLoader"
initial_context = {"user": None, "error": None}
idle = State("idle", initial=True, on={"LOAD": "loading"})
loading = State("loading", invoke={
"src": "fetchUser",
"onDone": {"target": "loaded", "actions": "storeUser"},
"onError": {"target": "error", "actions": "storeError"}
})
loaded = State("loaded", final=True)
error = State("error", on={"RETRY": "loading"})
@service
def fetch_user(self, interpreter, context, event):
return {"id": 1, "name": "Alice"}
@action
def store_user(self, interpreter, context, event, action_def):
context["user"] = event.data
@action
def store_error(self, interpreter, context, event, action_def):
context["error"] = str(event.data)
machine = UserLoader.create_machine()
interp = SyncInterpreter(machine).start()
interp.send("LOAD")
print(interp.context["user"]) # {"id": 1, "name": "Alice"}
interp.stop()
Service in MachineBuilder
from xstate_statemachine import MachineBuilder, SyncInterpreter
def fetch_user_fn(interpreter, context, event):
return {"id": 1, "name": "Alice"}
def store_user_fn(interpreter, context, event, action_def):
context["user"] = event.data
machine = (
MachineBuilder("userLoader")
.context({"user": None, "error": None})
.state("idle", initial=True, on={"LOAD": "loading"})
.state("loading", invoke={
"src": "fetchUser",
"onDone": {"target": "loaded", "actions": "storeUser"},
"onError": {"target": "error"}
})
.state("loaded", final=True)
.state("error", on={"RETRY": "loading"})
.service("fetchUser", fetch_user_fn)
.action("storeUser", store_user_fn)
.build()
)
interp = SyncInterpreter(machine).start()
interp.send("LOAD")
print(interp.context["user"]) # {"id": 1, "name": "Alice"}
interp.stop()
Service Lifecycle
When the interpreter enters a state with invoke, the following sequence occurs:
1. Enter the state (run entry actions)
2. Start the service (call the service function)
3. Service completes:
a. Success → send "done.invoke.<id>" event with return value
b. Failure → send "error.platform.<id>" event with exception
4. The machine processes the done/error event
5. Transition to onDone/onError target (run exit actions, transition actions, entry actions)
┌─────────────┐ ┌──────────────────┐ ┌──────────────┐
│ idle │────▶│ loading │────▶│ loaded │
│ │LOAD │ invoke:fetchUser │Done │ │
└─────────────┘ │ │────▶│ (final) │
│ │ └──────────────┘
│ │
│ │Error ┌──────────────┐
│ │────▶│ error │
└──────────────────┘ │ RETRY→loading│
└──────────────┘
Service Cancellation
When the machine exits a state that has an active invocation, the service is automatically cancelled:
- Async
Interpreter: The invoked task is cancelled viaasyncio.Task.cancel(). The service’sasyncio.CancelledErroris suppressed — noonErroris triggered. SyncInterpreter: Since sync services run to completion duringsend(), cancellation applies only to timers associated with the invoked state. If the state exits before a timer fires, the timer is discarded.
This means you can safely combine invoke with after timeouts: if the service completes first, the timer is cancelled when the state exits. If the timer fires first and causes a transition, the async service task is cancelled.
# Safe pattern: invoke + timeout
"loading": {
"invoke": {
"src": "fetchUser",
"onDone": {"target": "loaded"},
"onError": {"target": "error"}
},
"after": {
"5000": {"target": "timeout"}
}
}
# Whichever completes first wins — the loser is automatically cancelled
Note: Cancellation is automatic and requires no cleanup code. This is one of the key benefits of using
invokeover manual service management.
Event Naming for Done and Error
The interpreter uses a specific naming convention for invoke-related events:
| Event | Format | Example |
|---|---|---|
| Success | done.invoke.<id> |
done.invoke.userFetcher |
| Error | error.platform.<id> |
error.platform.userFetcher |
The <id> defaults to the state’s name if no explicit id is provided in the invoke config. When using multiple invokes per state, always provide explicit id values to avoid event name collisions:
"invoke": [
{ "src": "fetchA", "id": "serviceA", "onDone": ... },
{ "src": "fetchB", "id": "serviceB", "onDone": ... }
]
Sync vs Async Services
| Feature | SyncInterpreter |
Interpreter (async) |
|---|---|---|
| Service type | Regular def |
async def |
| Execution | Blocks until complete | Awaited concurrently |
| HTTP library | requests, urllib |
aiohttp, httpx |
| Multiple invokes | Sequential | Can run concurrently |
| Async service? | Raises NotSupportedError |
Fully supported |
Complete Example: User Data Loader with Retry
A production-style pattern with retry logic and error tracking:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "userDataLoader",
"initial": "idle",
"context": {
"userId": 42,
"user": None,
"error": None,
"retryCount": 0,
"maxRetries": 3
},
"states": {
"idle": {
"on": {"FETCH": "loading"}
},
"loading": {
"entry": "incrementRetry",
"invoke": {
"src": "fetchUserData",
"onDone": {
"target": "success",
"actions": "storeUser"
},
"onError": [
{"target": "loading", "guard": "canRetry", "actions": "logRetry"},
{"target": "failed", "actions": "storeFinalError"}
]
}
},
"success": {
"entry": "resetRetryCount",
"type": "final"
},
"failed": {
"on": {
"RESET": {"target": "idle", "actions": "resetAll"}
}
}
}
}
class UserDataLogic(MachineLogic):
def __init__(self):
super().__init__()
self._call_count = 0
# ---- Service ----
def fetchUserData(self, interpreter, context, event):
self._call_count += 1
user_id = context.get("userId", 1)
# Simulate: fail first 2 attempts, succeed on 3rd
if self._call_count < 3:
raise ConnectionError(
f"Attempt {self._call_count}: Connection refused"
)
return {"id": user_id, "name": "Alice", "email": "alice@example.com"}
# ---- Actions ----
def storeUser(self, interpreter, context, event, action_def):
context["user"] = event.data
context["error"] = None
print(f"User loaded: {event.data['name']}")
def incrementRetry(self, interpreter, context, event, action_def):
context["retryCount"] = context.get("retryCount", 0) + 1
print(f"Loading attempt #{context['retryCount']}...")
def logRetry(self, interpreter, context, event, action_def):
print(f" Retrying... ({event.data})")
def storeFinalError(self, interpreter, context, event, action_def):
context["error"] = str(event.data)
print(f"All retries exhausted. Error: {event.data}")
def resetRetryCount(self, interpreter, context, event, action_def):
context["retryCount"] = 0
def resetAll(self, interpreter, context, event, action_def):
context["user"] = None
context["error"] = None
context["retryCount"] = 0
# ---- Guards ----
def canRetry(self, context, event):
return context.get("retryCount", 0) < context.get("maxRetries", 3)
machine = create_machine(config, logic=UserDataLogic())
interp = SyncInterpreter(machine).start()
interp.send("FETCH")
# Output:
# Loading attempt #1...
# Retrying... (Attempt 1: Connection refused)
# Loading attempt #2...
# Retrying... (Attempt 2: Connection refused)
# Loading attempt #3...
# User loaded: Alice
print(interp.context["user"])
# {"id": 42, "name": "Alice", "email": "alice@example.com"}
interp.stop()
Complete Example: Payment Processing Flow
A multi-stage payment flow with validation, charging, and confirmation:
from xstate_statemachine import create_machine, SyncInterpreter, MachineLogic
config = {
"id": "paymentProcessor",
"initial": "idle",
"context": {
"amount": 0,
"currency": "USD",
"paymentMethod": None,
"transactionId": None,
"error": None,
"receipt": None
},
"states": {
"idle": {
"on": {
"START_PAYMENT": {
"target": "validating",
"actions": "storePaymentDetails"
}
}
},
"validating": {
"invoke": {
"src": "validatePayment",
"onDone": {"target": "charging"},
"onError": {"target": "validationFailed", "actions": "storeError"}
}
},
"charging": {
"invoke": {
"src": "chargePayment",
"onDone": {
"target": "confirming",
"actions": "storeTransactionId"
},
"onError": {"target": "chargeFailed", "actions": "storeError"}
}
},
"confirming": {
"invoke": {
"src": "generateReceipt",
"onDone": {
"target": "completed",
"actions": "storeReceipt"
},
"onError": {
"target": "completed",
"actions": "logReceiptError"
}
}
},
"completed": {
"entry": "notifySuccess",
"type": "final"
},
"validationFailed": {
"entry": "notifyValidationError",
"on": {
"RETRY": {"target": "idle", "actions": "clearError"}
}
},
"chargeFailed": {
"entry": "notifyChargeError",
"on": {
"RETRY": {"target": "idle", "actions": "clearError"}
}
}
}
}
class PaymentLogic(MachineLogic):
# ---- Services ----
def validatePayment(self, interpreter, context, event):
amount = context.get("amount", 0)
method = context.get("paymentMethod")
if amount <= 0:
raise ValueError("Amount must be positive")
if not method:
raise ValueError("Payment method is required")
return {"valid": True, "method": method}
def chargePayment(self, interpreter, context, event):
amount = context["amount"]
print(f"Charging ${amount:.2f}...")
# Simulate a charge — returns a transaction ID
return {"transactionId": "TXN-20260323-001", "charged": amount}
def generateReceipt(self, interpreter, context, event):
txn_id = context.get("transactionId", "UNKNOWN")
return {
"receiptId": f"RCP-{txn_id}",
"amount": context["amount"],
"currency": context["currency"],
"status": "paid"
}
# ---- Actions ----
def storePaymentDetails(self, interpreter, context, event, action_def):
context["amount"] = event.payload.get("amount", 0)
context["currency"] = event.payload.get("currency", "USD")
context["paymentMethod"] = event.payload.get("method")
def storeTransactionId(self, interpreter, context, event, action_def):
context["transactionId"] = event.data.get("transactionId")
def storeReceipt(self, interpreter, context, event, action_def):
context["receipt"] = event.data
def storeError(self, interpreter, context, event, action_def):
context["error"] = str(event.data)
def clearError(self, interpreter, context, event, action_def):
context["error"] = None
def logReceiptError(self, interpreter, context, event, action_def):
print(f"Receipt generation failed (non-critical): {event.data}")
def notifySuccess(self, interpreter, context, event, action_def):
txn = context.get("transactionId", "N/A")
amt = context.get("amount", 0)
print(f"Payment complete! Transaction: {txn}, Amount: ${amt:.2f}")
def notifyValidationError(self, interpreter, context, event, action_def):
print(f"Validation failed: {context.get('error', 'Unknown')}")
def notifyChargeError(self, interpreter, context, event, action_def):
print(f"Charge failed: {context.get('error', 'Unknown')}")
# Run the payment flow
machine = create_machine(config, logic=PaymentLogic())
interp = SyncInterpreter(machine).start()
interp.send("START_PAYMENT", amount=49.99, method="credit_card", currency="USD")
# Output:
# Charging $49.99...
# Payment complete! Transaction: TXN-20260323-001, Amount: $49.99
print(interp.context["transactionId"]) # TXN-20260323-001
print(interp.context["receipt"])
# {"receiptId": "RCP-TXN-20260323-001", "amount": 49.99, "currency": "USD", "status": "paid"}
interp.stop()
See Also
- Context — services often populate context via
onDoneactions - Guards — guard
onDonetransitions to route based on service results - Actions — actions that process service results in
onDone/onError - Delayed Transitions — combine
invokewithafterfor timeout patterns - Actors — spawn child machines as invoked services
- Interpreters — sync vs async interpreter behavior with services
- Pythonic API —
@servicedecorator andState(invoke=...)syntax