Making Your Workflow Event-Driven
Overview
Sometimes you want your workflows to kick into action when something happens in the outside world. Maybe an order comes in, a temperature sensor hits a threshold, or your cat posts a new Instagram photo. Here’s how to make your workflows respond to events, not just commands.
Creating an Event Dispatcher
First, let’s build a robust event dispatcher that can trigger workflows when events occur and persist trigger configurations:
# event_dispatcher.py
import asyncio
import json
import os
from datetime import datetime
from typing import Dict, List, Any, Optional
from grapheteria import WorkflowEngine
class EventDispatcher:
"""Manages event subscriptions and dispatches events to workflows."""
def __init__(self, storage_path="events"):
self.storage_path = storage_path
self.subscriptions = {} # event_type -> list of workflow IDs
self.active_workflows = {} # (workflow_id, run_id) -> WorkflowEngine
# Create storage directory if it doesn't exist
os.makedirs(storage_path, exist_ok=True)
# Load existing subscriptions
self.load_triggers()
def subscribe(self, event_type: str, workflow_id: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Subscribe a workflow to an event type with optional configuration."""
if event_type not in self.subscriptions:
self.subscriptions[event_type] = []
# Create subscription record
subscription = {
"workflow_id": workflow_id,
"config": config or {},
"created_at": datetime.now().isoformat()
}
self.subscriptions[event_type].append(subscription)
# Persist to storage
self.save_triggers()
return subscription
def unsubscribe(self, event_type: str, workflow_id: str) -> bool:
"""Unsubscribe a workflow from an event type."""
if event_type not in self.subscriptions:
return False
original_length = len(self.subscriptions[event_type])
self.subscriptions[event_type] = [
sub for sub in self.subscriptions[event_type]
if sub["workflow_id"] != workflow_id
]
# If the list is empty, remove the event type
if not self.subscriptions[event_type]:
del self.subscriptions[event_type]
# Persist changes
self.save_triggers()
# Return true if something was removed
return len(self.subscriptions.get(event_type, [])) < original_length
def get_subscriptions(self, event_type: str = None) -> Dict[str, List[Dict[str, Any]]]:
"""Get all subscriptions or subscriptions for a specific event type."""
if event_type:
return {event_type: self.subscriptions.get(event_type, [])}
return self.subscriptions
def save_triggers(self) -> None:
"""Save trigger configurations to filesystem."""
triggers_file = os.path.join(self.storage_path, "triggers.json")
# Use atomic write operation
temp_file = f"{triggers_file}.tmp"
with open(temp_file, 'w') as f:
json.dump(self.subscriptions, f, indent=2)
# Atomic rename
os.rename(temp_file, triggers_file)
def load_triggers(self) -> None:
"""Load trigger configurations from filesystem."""
triggers_file = os.path.join(self.storage_path, "triggers.json")
if os.path.exists(triggers_file):
try:
with open(triggers_file, 'r') as f:
self.subscriptions = json.load(f)
except json.JSONDecodeError:
# Handle corrupted file
self.subscriptions = {}
else:
self.subscriptions = {}
async def dispatch(self, event_type: str, payload: Any = None) -> List[Dict[str, Any]]:
"""Dispatch an event to all subscribed workflows."""
results = []
if event_type not in self.subscriptions:
return results
for subscription in self.subscriptions[event_type]:
workflow_id = subscription["workflow_id"]
config = subscription["config"]
# Create workflow instance
workflow = WorkflowEngine(workflow_id=workflow_id)
run_id = workflow.run_id
# Store in active workflows
self.active_workflows[(workflow_id, run_id)] = workflow
# Add event data to shared state
workflow.execution_state.shared.update({
"event_type": event_type,
"event_payload": payload,
"event_config": config,
"event_time": datetime.now().isoformat()
})
# Run the workflow
continuing, tracking_data = await workflow.run()
results.append({
"workflow_id": workflow_id,
"run_id": run_id,
"status": workflow.execution_state.workflow_status.name
})
return results
Adding Event Routes to FastAPI
Now, let’s extend your FastAPI application to handle event subscriptions and triggers:
# routes.py
from fastapi import APIRouter, HTTPException, Body
from typing import Dict, Any, Optional
from pydantic import BaseModel
from event_dispatcher import EventDispatcher
router = APIRouter()
event_dispatcher = EventDispatcher()
# Event subscription model
class EventSubscription(BaseModel):
workflow_id: str
config: Optional[Dict[str, Any]] = None
@router.post("/events/subscribe/{event_type}")
async def subscribe_to_event(event_type: str, subscription: EventSubscription):
"""Subscribe a workflow to an event type."""
result = event_dispatcher.subscribe(
event_type,
subscription.workflow_id,
subscription.config
)
return {
"message": f"Workflow {subscription.workflow_id} subscribed to event {event_type}",
"subscription": result
}
@router.delete("/events/unsubscribe/{event_type}")
async def unsubscribe_from_event(event_type: str, workflow_id: str):
"""Unsubscribe a workflow from an event type."""
success = event_dispatcher.unsubscribe(event_type, workflow_id)
if not success:
raise HTTPException(status_code=404, detail=f"No subscription found for event {event_type} and workflow {workflow_id}")
return {
"message": f"Workflow {workflow_id} unsubscribed from event {event_type}"
}
@router.get("/events")
async def list_events():
"""List all event subscriptions."""
return event_dispatcher.get_subscriptions()
@router.get("/events/{event_type}")
async def get_event_subscriptions(event_type: str):
"""Get subscriptions for a specific event type."""
subscriptions = event_dispatcher.get_subscriptions(event_type)
if not subscriptions.get(event_type):
raise HTTPException(status_code=404, detail=f"No subscriptions found for event {event_type}")
return subscriptions
@router.post("/events/trigger/{event_type}")
async def trigger_event(event_type: str, payload: Optional[Dict[str, Any]] = Body(None)):
"""Trigger an event, executing all subscribed workflows."""
results = await event_dispatcher.dispatch(event_type, payload)
return {
"message": f"Event {event_type} triggered",
"workflows_executed": len(results),
"results": results
}
Creating Webhook Triggers with Verification
For secure communication with external systems, let’s add webhook support with signature verification:
# webhook_routes.py
from fastapi import APIRouter, HTTPException, Request, Header, Depends
import json
import hmac
import hashlib
import os
from event_dispatcher import EventDispatcher
router = APIRouter()
event_dispatcher = EventDispatcher()
# Get webhook secret from environment (should be properly managed in production)
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "change-me-in-production")
async def verify_webhook_signature(
request: Request,
x_signature: Optional[str] = Header(None, alias="X-Webhook-Signature")
):
"""Verify the webhook signature for authenticity."""
if not x_signature:
# You may want to make this mandatory in production
return True
# Get request body for signature validation
body = await request.body()
# Compute expected signature
expected_signature = hmac.new(
WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
# Verify using constant-time comparison
if not hmac.compare_digest(expected_signature, x_signature):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
return True
@router.post("/webhooks/{event_type}")
async def webhook_handler(
event_type: str,
request: Request,
verified: bool = Depends(verify_webhook_signature)
):
"""Handle webhook calls for specific event types with signature verification."""
try:
# Parse the payload
body = await request.body()
if not body:
payload = {}
else:
try:
payload = json.loads(body)
except json.JSONDecodeError:
payload = {"raw_body": body.decode("utf-8", errors="replace")}
# Add headers and query params to payload
payload["headers"] = dict(request.headers)
payload["query_params"] = dict(request.query_params)
# Dispatch the event
results = await event_dispatcher.dispatch(event_type, payload)
return {
"status": "success",
"event_type": event_type,
"workflows_triggered": len(results)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Webhook processing failed: {str(e)}")
Scheduled Events with Background Tasks
Want to trigger workflows on a schedule? Let’s use FastAPI’s background tasks:
# scheduler.py
from fastapi import FastAPI, BackgroundTasks, HTTPException, Body
import asyncio
from datetime import datetime
from typing import Dict, Any, Optional
from event_dispatcher import EventDispatcher
app = FastAPI()
event_dispatcher = EventDispatcher()
scheduled_tasks = {}
async def run_scheduled_event(event_type, payload, interval_seconds):
"""Run an event on a schedule."""
while True:
try:
await event_dispatcher.dispatch(event_type, payload)
# Update payload with last_run timestamp
if isinstance(payload, dict):
payload["last_run"] = datetime.now().isoformat()
except Exception as e:
print(f"Error in scheduled event {event_type}: {str(e)}")
await asyncio.sleep(interval_seconds)
@app.post("/schedule/{event_type}")
async def schedule_event(
event_type: str,
interval_seconds: int,
background_tasks: BackgroundTasks,
payload: Optional[Dict[str, Any]] = Body(None)
):
"""Schedule an event to run periodically."""
if event_type in scheduled_tasks:
return {"message": f"Event {event_type} already scheduled"}
if interval_seconds < 5: # Reasonable minimum in production
raise HTTPException(status_code=400, detail="Interval must be at least 5 seconds")
# Initialize payload if None
actual_payload = payload or {}
actual_payload["scheduled_at"] = datetime.now().isoformat()
task = asyncio.create_task(run_scheduled_event(event_type, actual_payload, interval_seconds))
scheduled_tasks[event_type] = {
"task": task,
"interval": interval_seconds,
"payload": actual_payload,
"started_at": datetime.now().isoformat()
}
return {
"message": f"Event {event_type} scheduled to run every {interval_seconds} seconds",
"event_type": event_type,
"interval_seconds": interval_seconds
}
@app.delete("/schedule/{event_type}")
async def cancel_scheduled_event(event_type: str):
"""Cancel a scheduled event."""
if event_type not in scheduled_tasks:
raise HTTPException(status_code=404, detail=f"No scheduled event found for {event_type}")
task_info = scheduled_tasks[event_type]
task_info["task"].cancel()
del scheduled_tasks[event_type]
return {
"message": f"Scheduled event {event_type} cancelled",
"event_type": event_type
}
@app.get("/schedule")
async def list_scheduled_events():
"""List all scheduled events."""
result = {}
for event_type, task_info in scheduled_tasks.items():
result[event_type] = {
"interval_seconds": task_info["interval"],
"started_at": task_info["started_at"],
"payload": task_info["payload"]
}
return result
Using the Event System
Here’s how to put all this together:
# main.py
from fastapi import FastAPI
from routes import router as api_router
from webhook_routes import router as webhook_router
from scheduler import app as scheduler_app
app = FastAPI()
# Regular API routes
app.include_router(api_router, prefix="/api")
# Webhook routes - no prefix for easier external access
app.include_router(webhook_router)
# Include scheduler routes
app.include_router(scheduler_app, prefix="/api")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Practical Examples
Example 1: Responding to a Payment Event
Subscribe to payment events:
import requests
# Register a workflow to process orders when payment is received
response = requests.post(
"https://your-api.com/api/events/subscribe/payment_received",
json={
"workflow_id": "order_processing",
"config": {
"priority": "high",
"notify_on_completion": True
}
}
)
Trigger a payment event:
import requests
# Trigger the event when payment is confirmed
response = requests.post(
"https://your-api.com/api/events/trigger/payment_received",
json={
"order_id": "12345",
"amount": 99.99,
"currency": "USD",
"payment_method": "credit_card"
}
)
Example 2: Setting Up a GitHub Webhook with Signature Verification
- Generate a secure webhook secret and configure it in your environment:
export WEBHOOK_SECRET=your-secure-secret-key
- Configure your GitHub repository webhook settings:
- URL:
https://your-api.com/webhooks/github_push
- Content type:
application/json
- Secret: Same as your
WEBHOOK_SECRET
- Events: Select “Push” events
- URL:
- Subscribe your workflow:
import requests response = requests.post( "https://your-api.com/api/events/subscribe/github_push", json={ "workflow_id": "auto_deploy", "config": { "branches": ["main", "production"], "deploy_target": "production" } } )
- Verify your webhook is working:
import requests response = requests.get( "https://your-api.com/webhooks/github_push/test" )
Example 3: Scheduling a Daily Report
import requests
# Schedule a report generation event to run daily
response = requests.post(
"https://your-api.com/api/schedule/generate_daily_report",
json={
"interval_seconds": 86400, # 24 hours
"payload": {
"report_type": "sales_summary",
"recipients": ["team@company.com"]
}
}
)
Wrapping Up
With this event system, your workflows can spring to life at just the right moment. The persistent storage ensures your event subscriptions survive system restarts, while signature verification keeps your webhooks secure. Whether it’s responding to API calls, webhooks from external services, or running on a schedule, you’ve got the tools to make your state machine truly event-driven.
Remember, the best automation is the kind you don’t have to think about - it just happens when it should!