Advanced Nodes: Parallel/Batch

Beyond the Basics: Supercharging Your Nodes

Sometimes a simple node just doesn’t cut it. Whether you need to fetch data from multiple APIs simultaneously or process a batch of items, the Node class has got your back. Let’s explore how to take your nodes from “meh” to “magnificent”!

Embracing Asynchrony: The Async Magic

Any of the core node methods - prepare, execute, or cleanup - can be async. Just slap on that async keyword and you’re good to go:

class AsyncFetchNode(Node):
    async def execute(self, prepared_result):
        # Look at me, I'm async!
        await asyncio.sleep(1)  # Simulating network delay
        return {"data": "Fetched asynchronously"}

The workflow engine knows how to handle these async methods and will await them properly. No need to worry about the plumbing!

Going Parallel: Multi-tasking Like a Pro

Want to run multiple operations at the same time? Extend the _execute_with_retry method:

class ParallelNode(Node):
    async def prepare(self, shared, request_input):
        # Return a list of items to process in parallel
        return [{"id": i} for i in range(5)]
        
    async def _execute_with_retry(self, items):
        # Process all items in parallel
        tasks = [self._process_item(item) for item in items]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Check for exceptions
        for result in results:
            if isinstance(result, Exception):
                raise result
                
        return results
        
    async def execute(self, item):
        # Process a single item
        await asyncio.sleep(1)  # Simulate work
        return f"Processed item {item['id']}"
        
    def cleanup(self, shared, prepared_result, execution_result):
        # Initialize results list if it doesn't exist
        if "parallel_results" not in shared:
            shared["parallel_results"] = []
        
        # Append new results to existing list
        shared["parallel_results"].extend(execution_result)
        return execution_result

This node will process all items simultaneously, making your workflow zip along at warp speed!

Batch Processing: Same Task, Different Data

Don’t need the complexity of parallelism but want to process multiple items? Batch processing is your friend:

class BatchNode(Node):
    def prepare(self, shared, request_input):
        # Return batch of items to process
        return [f"item-{i}" for i in range(10)]
    
    async def _execute_with_retry(self, items):
        # Process each item in batch sequentially
        results = []
        for item in items:
            # Process each item with potential retries
            result = await super()._process_item(item)
            results.append(result)
        return results
        
    def execute(self, item):
        # Process a single item
        return f"Processed {item}"
        
    def cleanup(self, shared, prepared_result, execution_result):
        # Initialize results dictionary if needed
        if "batch_results" not in shared:
            shared["batch_results"] = []
            
        # Add latest batch results
        shared["batch_results"].extend(execution_result)
        return execution_result

Error Handling in Advanced Nodes

Currently, if any task in a parallel or batch node fails, the entire node fails. This “fail fast” approach keeps things simple and predictable.

class ParallelWithErrorNode(Node):
    async def prepare(self, shared, request_input):
        return [1, 2, 0, 4]  # That zero will cause trouble!
        
    async def execute(self, item):
        # This will fail for item == 0
        result = 10 / item
        return result
        
    def exec_fallback(self, prepared_result, exception):
        # This will run if execute fails
        return f"Failed to process items: {exception}"

If you need more sophisticated error handling (like continuing despite errors in some items), feel free to raise an issue on GitHub. We’re always looking to improve!

Remember: these advanced nodes follow the same lifecycle as basic nodes, just with superpowers. Mix and match these techniques to create workflows that handle real-world complexity with grace!