Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add flow_id, self.name for instance names of Flows and Nodes and easy isolated storage for nested flows #5

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

johnr14
Copy link

@johnr14 johnr14 commented Jan 15, 2025

WIP to get instance names and flow names.

For discussion as in Issue #4

THIS IS JUST A DRAFT, NOT TESTED EXTENSIVELY.

Now when inside a prep() , exec() or post(), it's possible to print the instance name with

print(f"My name is: {self.name} (instance of {self.__class__.__name__})")
        if self.flow:
            print(f"Flow name: {self.flow.name}")
            if self.flow.parent:
                print(f"Parent flow: {self.flow.parent.name}")

Will now print :

My name is: opinion2_Node (instance of GetOpinion)
Flow name: rework2_Flow

Todo : get feedback and more ideas + review ?

Questions: This does add complexity and more lines of code... Is it all useful ?

So I think that :

  • Having a way to keep history of flow and logging time per node/flow would be the next step.
  • Have different packages names :
    • pocketflow and pocketflowextra to separate basic and advanced framework that are still somewhat compatible but with missing features ? Users could just cherry pick some parts from extra to add features quickly.
    • pocketflowdev for development and experimental features that may break compatibility ?

Try this code

python rework.py

I added rework.py as example, look at class GetOpinion(Node): for actual use.

EDIT: Now with updated build_mermaid code !

graph LR

    subgraph rework2_Flow["rework2_Flow  (Flow)"]
    load2_Node["load2_Node (LoadFile)"]
    opinion2_Node["opinion2_Node (GetOpinion)"]
    load2_Node --> opinion2_Node
    rework2_Node["rework2_Node (ReworkFile)"]
    opinion2_Node --> rework2_Node
    valid2_Node["valid2_Node (GetValidation)"]
    rework2_Node --> valid2_Node
    valid2_Node --> opinion2_Node
    save2_Node["save2_Node (SaveFile)"]
    valid2_Node --> save2_Node
    rework2_Node --> opinion2_Node
    end
Loading

PR comment generated with AI assistance :

This change introduces several improvements to BaseNode and Flow classes:

  1. Added instance name tracking with get_instance_name()
  2. Added flow hierarchy tracking with flow.name and flow.id
  3. Improved flow name lookup with explicit naming support

These changes enable:

  • Better debugging with named instances and flows
  • Flow hierarchy awareness for nested flows
  • Consistent naming across nodes and flows

The improvements include:

  • Automatic instance name lookup walking up the call stack
  • Flow name tracking with explicit naming support
  • Parent flow tracking for nested flows

This is particularly useful for:

  • Debugging complex flows with named components
  • Tracking flow execution in logs
  • Visualizing flow hierarchies
  • Maintaining backward compatibility

@johnr14
Copy link
Author

johnr14 commented Jan 16, 2025

For performance and tracing flow history, I think a decorator could do.

  • It could provide total cost in tokens for a flow if call_llm returns that information.
  • Total time per flow or per node...

Also I didn't get to the point where I would need to manage concurrent connections, requests per minutes/flow/batch, and token per node/flow... But it would be nice to have it planned at this stage.

Like infinite loop prevention other than logging to shared, some sort of maximum payload per node/flow ?

But am I getting to far from a minimal implementation of PocketFlow ?

Some example decorator from AI:

 def trace_node(debug=False):                                                                                                                                                                                                                                                                                                                                                                 
     def decorator(func):                                                                                                                                                                                                                                                                                                                                                                     
         if not debug:                                                                                                                                                                                                                                                                                                                                                                        
             # Return original function if debug is disabled                                                                                                                                                                                                                                                                                                                                  
             return func                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                                              
         def wrapper(node, *args, **kwargs):                                                                                                                                                                                                                                                                                                                                                  
             # Initialize trace if not exists                                                                                                                                                                                                                                                                                                                                                 
             if not hasattr(node, '_trace'):                                                                                                                                                                                                                                                                                                                                                  
                 node._trace = []                                                                                                                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                                                                                                                              
             # Create trace entry                                                                                                                                                                                                                                                                                                                                                             
             trace_entry = {                                                                                                                                                                                                                                                                                                                                                                  
                 'timestamp': time.time(),                                                                                                                                                                                                                                                                                                                                                    
                 'node': node.name,                                                                                                                                                                                                                                                                                                                                                           
                 'action': func.__name__,                                                                                                                                                                                                                                                                                                                                                     
                 'args': args,                                                                                                                                                                                                                                                                                                                                                                
                 'kwargs': kwargs,                                                                                                                                                                                                                                                                                                                                                            
                 'status': 'started'                                                                                                                                                                                                                                                                                                                                                          
             }                                                                                                                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                                                                                                              
             # Add initial entry                                                                                                                                                                                                                                                                                                                                                              
             node._trace.append(trace_entry)                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                              
             try:                                                                                                                                                                                                                                                                                                                                                                             
                 # Execute the original method                                                                                                                                                                                                                                                                                                                                                
                 result = func(node, *args, **kwargs)                                                                                                                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                                                                                                              
                 # Update entry with success status                                                                                                                                                                                                                                                                                                                                           
                 trace_entry.update({                                                                                                                                                                                                                                                                                                                                                         
                     'status': 'success',                                                                                                                                                                                                                                                                                                                                                     
                     'duration': time.time() - trace_entry['timestamp'],                                                                                                                                                                                                                                                                                                                      
                     'result': result                                                                                                                                                                                                                                                                                                                                                         
                 })                                                                                                                                                                                                                                                                                                                                                                           
                 return result                                                                                                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                                                                                                              
             except Exception as e:                                                                                                                                                                                                                                                                                                                                                           
                 # Update entry with error status                                                                                                                                                                                                                                                                                                                                             
                 trace_entry.update({                                                                                                                                                                                                                                                                                                                                                         
                     'status': 'error',                                                                                                                                                                                                                                                                                                                                                       
                     'duration': time.time() - trace_entry['timestamp'],                                                                                                                                                                                                                                                                                                                      
                     'exception': str(e),                                                                                                                                                                                                                                                                                                                                                     
                     'traceback': traceback.format_exc()                                                                                                                                                                                                                                                                                                                                      
                 })                                                                                                                                                                                                                                                                                                                                                                           
                 raise                                                                                                                                                                                                                                                                                                                                                                        
             finally:                                                                                                                                                                                                                                                                                                                                                                         
                 # Finalize entry                                                                                                                                                                                                                                                                                                                                                             
                 trace_entry['end_time'] = time.time()                                                                                                                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                                                                                                                                              
         return wrapper                                                                                                                                                                                                                                                                                                                                                                       
     return decorator   
...
DEBUG = True  # Set this globally or via configuration                                                                                                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                                                                                                                                              
 class Node(BaseNode):                                                                                                                                                                                                                                                                                                                                                                        
     @trace_node(debug=DEBUG)                                                                                                                                                                                                                                                                                                                                                                 
     def prep(self, shared):  
...
 # Access tracing information                                                                                                                                                                                                                                                                                                                                                                 
 for node in flow.get_all_nodes():                                                                                                                                                                                                                                                                                                                                                            
     print(f"Node: {node.name}")                                                                                                                                                                                                                                                                                                                                                              
     for entry in node._trace:                                                                                                                                                                                                                                                                                                                                                                
         print(f"  {entry['action']}: {entry['status']} in {entry['duration']:.2f}s")    

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant