Reference: Directive Contract¶
Complete API reference for the server-authored invalidation protocol
Directives are the contract between your server and Verity clients. They are small, semantic JSON messages that tell clients what truth-state changed and needs to be refreshed.
Conceptual Overview
This is the API reference. For conceptual understanding, see Directives concept.
Philosophy¶
Directives embody Verity's core separation of concerns:
- Server owns domain knowledge (what changed, what's affected)
- Verity orchestrates data synchronization (when to fetch, how to coalesce)
- View renders the result (how it looks)
The server never dictates DOM structure or component behavior. It simply declares: "This piece of truth-state changed."
Directive Types¶
refresh_collection¶
Invalidates and refetches a collection identified by name and optional parameters.
Schema¶
{
op: "refresh_collection"
name: string // Collection type name
params?: Record<string, any> // Optional filter/query parameters
params_mode?: "exact" | "contains" // Parameter matching mode (default: "exact")
// Optional metadata
idempotency_key?: string // Deduplication key
timestamp?: number // When the change occurred (ms since epoch)
audience?: string // Target audience (e.g., "user-123", "team-abc")
source?: string // Client ID that originated this change
seq?: number // Sequence number for ordering
result?: any // Optional: include the updated data
}
Behavior¶
Without params:
todos collection regardless of parameters
- Each client refetches every cached (name, params) combination they hold
With params (exact mode):
{ status: "active" }
- Collections with { status: "completed" } or {} are unaffected
With params (contains mode):
{
"op": "refresh_collection",
"name": "todos",
"params": { "status": "active" },
"params_mode": "contains"
}
{ status: "active" }
- Matches { status: "active" }, { status: "active", project: 5 }, etc.
Use Cases¶
| Scenario | Directive |
|---|---|
| Created a new item | { op: "refresh_collection", name: "todos" } |
| Deleted an item | { op: "refresh_collection", name: "todos" } |
| Updated item affecting sort/filter | { op: "refresh_collection", name: "todos", params: {...} } |
| Bulk operation | { op: "refresh_collection", name: "todos", idempotency_key: "bulk-xyz" } |
| Status-specific change | { op: "refresh_collection", name: "todos", params: { status: "active" } } |
Server Example¶
@app.post('/api/todos')
def create_todo():
todo = Todo(title=request.json['title'], status='active')
db.session.add(todo)
db.session.commit()
directives = [
# Invalidate all todo collections
{ 'op': 'refresh_collection', 'name': 'todos' }
]
emit_directives(directives) # SSE fan-out
return {
'todo': todo.to_dict(),
'directives': directives # Immediate response
}
refresh_item¶
Invalidates and refetches a specific item identified by type and ID.
Schema¶
{
op: "refresh_item"
name: string // Item type name
id: string | number // Item identifier
level?: string // Specific level to refresh
// Optional metadata
idempotency_key?: string
timestamp?: number
audience?: string
source?: string
seq?: number
result?: any // Optional: include the updated data
}
Behavior¶
Without level:
todo:42
- Refetches those levels
- Uses level conversion graphs to minimize fetches
With level:
expanded level is fetched and fresh
- If client holds simplified, it fetches expanded
- If client already has expanded, it refetches to ensure freshness
Level Conversion Example¶
Suppose you have these level relationships:
Scenario 1: Client holds simplified
simplified level only
Scenario 2: Client holds expanded
// Directive: refresh todo:42 at expanded level
{ "op": "refresh_item", "name": "todo", "id": 42, "level": "expanded" }
expanded level
- simplified can be derived from expanded (if conversion defined)
- Net: 1 fetch
Scenario 3: Client holds simplified, needs full
full level
- Both expanded and simplified can be derived from full
- Net: 1 fetch
Use Cases¶
| Scenario | Directive |
|---|---|
| Updated an item | { op: "refresh_item", name: "todo", id: 42 } |
| Server processing changed item | { op: "refresh_item", name: "todo", id: 42 } |
| Need specific detail level | { op: "refresh_item", name: "todo", id: 42, level: "full" } |
| Optimistic with payload | { op: "refresh_item", name: "todo", id: 42, result: {...} } |
Server Example¶
@app.put('/api/todos/<int:todo_id>')
def update_todo(todo_id):
todo = Todo.query.get_or_404(todo_id)
todo.title = request.json['title']
todo.completed = request.json['completed']
db.session.commit()
directives = [
# Refresh the item
{ 'op': 'refresh_item', 'name': 'todo', 'id': todo_id },
# Also refresh collections (item might affect order/filters)
{ 'op': 'refresh_collection', 'name': 'todos' }
]
emit_directives(directives)
return {
'todo': todo.to_dict(),
'directives': directives
}
invalidate¶
Wrapper directive that groups multiple directives. Rarely needed—most endpoints just return an array of directives.
Schema¶
{
op: "invalidate"
targets: Directive[] // Array of directives to apply
// Optional metadata
idempotency_key?: string
timestamp?: number
audience?: string
source?: string
seq?: number
}
Behavior¶
{
"op": "invalidate",
"targets": [
{ "op": "refresh_item", "name": "todo", "id": 42 },
{ "op": "refresh_collection", "name": "todos" }
]
}
Equivalent to:
[
{ "op": "refresh_item", "name": "todo", "id": 42 },
{ "op": "refresh_collection", "name": "todos" }
]
Use Cases¶
- Explicit grouping: When you want to emphasize that directives are related
- Conditional application: Client-side logic can check conditions before applying targets
- Metadata sharing: Apply same
idempotency_keyoraudienceto multiple directives
When to use
In most cases, just return an array of directives. Use invalidate only when you need explicit grouping semantics or shared metadata.
Metadata Fields¶
All directive types support these optional metadata fields:
idempotency_key¶
Prevents duplicate processing of the same logical operation.
Behavior: - Verity tracks recent idempotency keys (default: last 1000, 5-minute window) - If same key seen twice, second instance is ignored - Useful for bulk operations or unreliable transports
Example:
import uuid
@app.post('/api/todos/bulk-complete')
def bulk_complete():
operation_id = str(uuid.uuid4())
# ... perform bulk update ...
return {
'directives': [{
'op': 'refresh_collection',
'name': 'todos',
'idempotency_key': f'bulk-complete-{operation_id}'
}]
}
timestamp¶
Records when the server-side change occurred (milliseconds since epoch).
Behavior: - Informational only (Verity doesn't use it for ordering) - Useful for debugging and audit logs - Can be used by custom directive handlers
audience¶
Scopes the directive to specific users, teams, or tenant contexts.
Behavior:
- SSE clients subscribe to specific audiences
- Directives only delivered to clients with matching audience
- Default audience: "global"
Server Example:
@app.put('/api/users/<int:user_id>/settings')
def update_user_settings(user_id):
# ... update settings ...
directives = [{
'op': 'refresh_item',
'name': 'user_settings',
'id': user_id
}]
# Only send to this specific user
emit_directives(directives, audience=f'user-{user_id}')
return { 'directives': directives }
Client Setup:
const registry = Verity.createRegistry({
sse: {
url: '/api/events',
audience: `user-${currentUserId}` // Subscribe to user-specific events
}
})
source¶
Identifies which client originated the change (auto-generated client ID).
Behavior:
- Automatically added by Verity when sending mutations
- Used to prevent double-application (client ignores its own SSE echoes)
- Set via X-Verity-Client-ID header
Flow:
1. Client A: PUT /api/todos/42 (header: X-Verity-Client-ID: client-abc)
2. Server: Returns directives with source: "client-abc"
3. Server: Emits to SSE with source: "client-abc"
4. Client A: Ignores SSE (source matches)
5. Client B: Processes SSE (source doesn't match)
seq¶
Sequence number for detecting missed messages in SSE streams.
{
"type": "directives",
"seq": 42,
"audience": "global",
"directives": [
{ "op": "refresh_collection", "name": "todos" }
]
}
Behavior: - Server increments sequence per audience - Client tracks expected next sequence - If gap detected (expected 42, got 44), client triggers resync - Resync: force-refresh all active collections
Server Implementation:
_audience_seq = {} # audience -> current seq
def emit_directives(directives, audience='global'):
seq = _next_seq(audience)
payload = {
'type': 'directives',
'seq': seq,
'audience': audience,
'directives': directives
}
_broadcast(payload, audience=audience)
result¶
Includes the updated data inline, avoiding an immediate refetch.
{
"op": "refresh_item",
"name": "todo",
"id": 42,
"result": {
"id": 42,
"title": "Buy milk",
"completed": true
}
}
Behavior:
- Verity immediately applies the result to cache
- Marks the item as fresh (resets staleness timer)
- If client needs a different level, it still fetches
- Useful for "hot path" operations where server has the data
Use Case:
@app.put('/api/todos/<int:todo_id>/toggle')
def toggle_todo(todo_id):
todo = Todo.query.get_or_404(todo_id)
todo.completed = not todo.completed
db.session.commit()
# Include updated data to avoid refetch
return {
'directives': [{
'op': 'refresh_item',
'name': 'todo',
'id': todo_id,
'result': todo.to_dict() # ← Client gets data immediately
}]
}
Level Matching
The result payload must match the level the client currently holds. If unsure, omit result and let Verity refetch.
Directive Transport¶
Pull Path: Mutation Response¶
The most common pattern: return directives directly in your mutation response.
// Client makes mutation
const response = await fetch('/api/todos/42', {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'X-Verity-Client-ID': registry.clientId // Tag with client ID
},
body: JSON.stringify({ title: 'New title' })
})
const payload = await response.json()
// Apply directives
if (payload.directives) {
await registry.applyDirectives(payload.directives)
}
Response format:
{
"todo": { "id": 42, "title": "New title" },
"directives": [
{ "op": "refresh_item", "name": "todo", "id": 42 },
{ "op": "refresh_collection", "name": "todos" }
]
}
Push Path: Server-Sent Events (SSE)¶
For multi-client synchronization, emit directives over SSE.
Server Setup¶
from verity.shared.sse import emit_directives
@app.put('/api/todos/<int:todo_id>')
def update_todo(todo_id):
# 1. Perform mutation
todo = Todo.query.get_or_404(todo_id)
todo.title = request.json['title']
db.session.commit()
# 2. Define directives
directives = [
{ 'op': 'refresh_item', 'name': 'todo', 'id': todo_id },
{ 'op': 'refresh_collection', 'name': 'todos' }
]
# 3. Emit to SSE (for other clients)
emit_directives(
directives,
audience='global',
source=request.headers.get('X-Verity-Client-ID')
)
# 4. Return to requesting client
return { 'todo': todo.to_dict(), 'directives': directives }
Client Setup¶
const registry = Verity.createRegistry({
sse: {
url: '/api/events',
audience: 'global',
withCredentials: true, // For auth cookies
initialRetryMs: 1000, // Reconnect delay
maxRetryMs: 30000
}
})
// Directives from SSE are applied automatically
// (excluding those with source === registry.clientId)
SSE Message Format¶
event: message
data: {"type":"directives","seq":42,"audience":"global","directives":[{"op":"refresh_collection","name":"todos"}]}
Push Path: Custom Transport¶
You can plug in WebSocket, GraphQL subscriptions, or any other transport.
registry.configureDirectiveSource({
type: 'websocket',
connect: ({ applyDirectives, clientId }) => {
const ws = new WebSocket('wss://example.com/directives')
ws.onmessage = (event) => {
const { directives, source } = JSON.parse(event.data)
// Ignore echoes from this client
if (source !== clientId) {
applyDirectives(directives)
}
}
// Return cleanup function
return () => ws.close()
}
})
Common Patterns¶
Pattern: Create Item¶
@app.post('/api/todos')
def create_todo():
todo = Todo(title=request.json['title'])
db.session.add(todo)
db.session.commit()
return {
'todo': todo.to_dict(),
'directives': [
# New item affects collection
{ 'op': 'refresh_collection', 'name': 'todos' }
]
}
Pattern: Update Item¶
@app.put('/api/todos/<int:todo_id>')
def update_todo(todo_id):
todo = Todo.query.get_or_404(todo_id)
todo.title = request.json['title']
db.session.commit()
return {
'todo': todo.to_dict(),
'directives': [
# Item changed
{ 'op': 'refresh_item', 'name': 'todo', 'id': todo_id },
# Collection might be affected (sort order, filters)
{ 'op': 'refresh_collection', 'name': 'todos' }
]
}
Pattern: Delete Item¶
@app.delete('/api/todos/<int:todo_id>')
def delete_todo(todo_id):
todo = Todo.query.get_or_404(todo_id)
db.session.delete(todo)
db.session.commit()
return {
'directives': [
# Don't refresh item (it's gone!)
# Just refresh collection (item removed)
{ 'op': 'refresh_collection', 'name': 'todos' }
]
}
Pattern: Status Change (Conditional Directives)¶
@app.put('/api/todos/<int:todo_id>')
def update_todo(todo_id):
todo = Todo.query.get_or_404(todo_id)
old_status = todo.status
todo.status = request.json['status']
db.session.commit()
directives = [
{ 'op': 'refresh_item', 'name': 'todo', 'id': todo_id }
]
# If status changed, refresh affected filtered collections
if old_status != todo.status:
directives.extend([
# Refresh old status collection
{
'op': 'refresh_collection',
'name': 'todos',
'params': { 'status': old_status }
},
# Refresh new status collection
{
'op': 'refresh_collection',
'name': 'todos',
'params': { 'status': todo.status }
}
])
return { 'directives': directives }
Pattern: Cascading Updates¶
@app.put('/api/projects/<int:project_id>/archive')
def archive_project(project_id):
project = Project.query.get_or_404(project_id)
project.archived = True
# Archiving cascades to tasks
affected_task_ids = []
for task in project.tasks:
task.archived = True
affected_task_ids.append(task.id)
db.session.commit()
directives = [
# Refresh the project
{ 'op': 'refresh_item', 'name': 'project', 'id': project_id },
{ 'op': 'refresh_collection', 'name': 'projects' },
# Refresh all affected tasks
*[
{ 'op': 'refresh_item', 'name': 'task', 'id': task_id }
for task_id in affected_task_ids
],
# Refresh task collections
{ 'op': 'refresh_collection', 'name': 'tasks' }
]
return { 'directives': directives }
Pattern: Bulk Operations with Idempotency¶
import uuid
@app.post('/api/todos/bulk-complete')
def bulk_complete():
operation_id = str(uuid.uuid4())
todo_ids = request.json['todo_ids']
for todo_id in todo_ids:
todo = Todo.query.get(todo_id)
if todo:
todo.completed = True
db.session.commit()
return {
'directives': [{
'op': 'refresh_collection',
'name': 'todos',
'idempotency_key': f'bulk-complete-{operation_id}'
}]
}
Pattern: Optimistic with Payload¶
@app.put('/api/todos/<int:todo_id>/toggle')
def toggle_todo(todo_id):
"""Hot path: toggle completion status"""
todo = Todo.query.get_or_404(todo_id)
todo.completed = not todo.completed
db.session.commit()
# Include updated data to avoid refetch
return {
'directives': [{
'op': 'refresh_item',
'name': 'todo',
'id': todo_id,
'result': todo.to_dict() # ← Client gets instant update
}]
}
Best Practices¶
1. Be Explicit with Parameters¶
# ❌ Ambiguous
{ 'op': 'refresh_collection', 'name': 'todos' }
# (Refreshes ALL todo collections, including ones client doesn't have)
# ✅ Explicit when possible
{ 'op': 'refresh_collection', 'name': 'todos', 'params': { 'status': 'active' } }
# (Only refreshes the specific filtered collection)
2. Include Affected Collections¶
# ❌ Incomplete
@app.put('/api/todos/<int:id>')
def update_todo(id):
# ...
return {
'directives': [
{ 'op': 'refresh_item', 'name': 'todo', 'id': id }
# Missing: collection refresh!
]
}
# ✅ Complete
return {
'directives': [
{ 'op': 'refresh_item', 'name': 'todo', 'id': id },
{ 'op': 'refresh_collection', 'name': 'todos' } # ← Don't forget
]
}
3. Use Idempotency for Critical Operations¶
# ✅ For important bulk operations
@app.post('/api/todos/bulk-delete')
def bulk_delete():
operation_id = str(uuid.uuid4())
# ...
return {
'directives': [{
'op': 'refresh_collection',
'name': 'todos',
'idempotency_key': f'bulk-delete-{operation_id}'
}]
}
4. Use Payloads for Hot Paths¶
# ✅ For frequent operations where server has the data
@app.put('/api/todos/<int:id>/toggle')
def toggle_todo(id):
todo.completed = not todo.completed
db.session.commit()
return {
'directives': [{
'op': 'refresh_item',
'name': 'todo',
'id': id,
'result': todo.to_dict() # ← Skip refetch
}]
}
5. Scope with Audiences¶
# ✅ For user-specific or team-specific data
@app.put('/api/users/<int:user_id>/profile')
def update_profile(user_id):
# ...
emit_directives(
[{ 'op': 'refresh_item', 'name': 'user', 'id': user_id }],
audience=f'user-{user_id}' # ← Only this user gets update
)
Testing Directives¶
Unit Test: Verify Directive Emission¶
def test_update_todo_returns_directives():
response = client.put('/api/todos/1', json={'title': 'New'})
data = response.json
assert 'directives' in data
assert len(data['directives']) == 2
# Check item directive
assert data['directives'][0] == {
'op': 'refresh_item',
'name': 'todo',
'id': 1
}
# Check collection directive
assert data['directives'][1] == {
'op': 'refresh_collection',
'name': 'todos'
}
Integration Test: Verify Refetch Triggered¶
// Browser test (Playwright, Cypress, etc.)
test('directive triggers refetch', async ({ page }) => {
await page.goto('/todos')
// Initial state
await expect(page.locator('[data-todo-id="1"]')).toHaveText('Buy milk')
// Trigger update (returns directives)
await page.click('[data-update-todo="1"]')
// Wait for directive-triggered refetch
await page.waitForResponse(resp =>
resp.url().includes('/api/todos') && resp.request().method() === 'GET'
)
// Verify UI updated
await expect(page.locator('[data-todo-id="1"]')).toHaveText('Buy milk - DONE')
})
Summary¶
Core Directive Types:
| Type | Purpose | Required Fields |
|---|---|---|
refresh_collection |
Invalidate & refetch collection | op, name |
refresh_item |
Invalidate & refetch item | op, name, id |
invalidate |
Group multiple directives | op, targets |
Key Metadata:
| Field | Purpose |
|---|---|
idempotency_key |
Prevent duplicate processing |
timestamp |
Record when change occurred |
audience |
Scope to specific users/teams |
source |
Identify originating client |
seq |
Detect missed messages |
result |
Include updated data inline |
Transport Paths:
- Pull (mutation response): Return directives in response body
- Push (SSE): Emit directives to SSE stream for fan-out
- Custom: Plug in WebSocket, GraphQL subscriptions, etc.
Next Steps¶
- Concepts: Directives concept guide
- Levels: Levels & Conversions for understanding
refresh_itemwith levels - Architecture: Backend of Frontend for full data flow
- Core API: Registry reference for
applyDirectives()and SSE configuration