Extending the Logging System
Where Do Your Workflows Go When They Sleep?
By default, Grapheteria stores all your workflow logs in the local filesystem. This works great for development, but when you’re ready for the big leagues (aka production), you might want something more robust. Maybe you need centralized storage, better querying capabilities, or just don’t trust those pesky local files (we’ve all accidentally rm -rf
‘d something important, right?).
Tracking Data: The Memory of Your Workflows
Every workflow maintains a tracking_data
structure that contains:
{
"workflow_id": "your.awesome.workflow",
"run_id": "20230415_120523_123",
"steps": [
# State snapshot after each step execution
# Each containing everything needed to resume execution
]
}
The steps
list is particularly magical - it captures the complete execution state after each node runs. This is what enables our workflow to pick up exactly where it left off, even if your server decided to take an unplanned vacation.
At the end of every step, the workflow engine calls
self.storage.save_state()
with the storage object being whatever custom backend you decide to provide (defaults to local file system)
Creating Your Own Storage Backend
The StorageBackend
abstract class defines the interface any storage implementation must follow:
class StorageBackend(ABC):
@abstractmethod
def save_state(self, workflow_id: str, run_id: str, save_data: dict) -> None:
"""Save the workflow execution state."""
pass
@abstractmethod
def load_state(self, workflow_id: str, run_id: str) -> Optional[Dict]:
"""Load a workflow execution state."""
pass
#Optional methods for easier resumeability
def list_runs(self, workflow_id: str) -> List[str]:
"""List all runs for a given workflow."""
pass
def list_workflows(self) -> List[str]:
"""List all workflows."""
pass
Let’s break down these methods:
save_state
: Stores the workflow state with its unique identifiersload_state
: Retrieves the state of a specific workflow runlist_runs
: Gets all runs for a specific workflow (useful for history/debugging)list_workflows
: Lists all available workflows in the storage
Example: SQLite Storage Implementation
Here’s an example of implementing SQLite storage - observe how it satisfies the same interface:
from grapheteria.utils import StorageBackend
class SQLiteStorage(StorageBackend):
"""SQLite implementation of storage backend."""
def __init__(self, db_path: str = "workflows.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS workflow_states (
workflow_id TEXT,
run_id TEXT,
state_json TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (workflow_id, run_id)
)
''')
conn.commit()
@contextmanager
def _get_connection(self):
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
def save_state(self, workflow_id: str, run_id: str, save_data: dict) -> None:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'''
INSERT OR REPLACE INTO workflow_states (workflow_id, run_id, state_json, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
''',
(workflow_id, run_id, json.dumps(save_data))
)
conn.commit()
def load_state(self, workflow_id: str, run_id: str) -> Optional[Dict]:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT state_json FROM workflow_states WHERE workflow_id = ? AND run_id = ?",
(workflow_id, run_id)
)
row = cursor.fetchone()
if not row:
return None
return json.loads(row[0])
Using the SQLite backend is as simple as:
# Initialize the workflow engine with SQLite storage
storage = SQLiteStorage("production.db")
engine = WorkflowEngine(
workflow_id="my.workflow",
storage_backend=storage
)
# Now all state is stored in SQLite!
Creating Your Own Storage
Want to store workflows in Redis, MongoDB, or your secret underground bunker’s mainframe? Just implement those four methods and you’re good to go!
Here’s a stub for your next storage adventure:
class MyAwesomeStorage(StorageBackend):
def __init__(self, connection_string):
# Connect to your storage service
self.client = AwesomeStorageClient(connection_string)
def save_state(self, workflow_id, run_id, save_data):
# Your code to save state
self.client.upsert_document(
collection="workflows",
key=f"{workflow_id}:{run_id}",
data=save_data
)
# Implement the other required methods...
Now your workflows can live wherever they want - give them the freedom they deserve!