Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Workflow cannot respond to errors #1899

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/application/flow/workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ def __init__(self, flow: Flow, params, work_flow_post_handler: WorkFlowPostHandl
self.audio_list = audio_list
self.params = params
self.flow = flow
self.lock = threading.Lock()
self.context = {}
self.node_chunk_manage = NodeChunkManage(self)
self.work_flow_post_handler = work_flow_post_handler
Expand Down Expand Up @@ -391,6 +390,8 @@ def run_chain_manage(self, current_node, node_result_future):
start_node = self.get_start_node()
current_node = get_node(start_node.type)(start_node, self.params, self)
result = self.run_chain(current_node, node_result_future)
if result is None:
return
node_list = self.get_next_node_list(current_node, result)
if len(node_list) == 1:
self.run_chain_manage(node_list[0], None)
Expand Down Expand Up @@ -424,7 +425,7 @@ def run_chain(self, current_node, node_result_future=None):
return result
except Exception as e:
traceback.print_exc()
return []
return None

def hand_node_result(self, current_node, node_result_future):
try:
Expand Down Expand Up @@ -507,7 +508,6 @@ def hand_event_node_result(self, current_node, node_result_future):
# 添加节点
self.append_node(current_node)
traceback.print_exc()
self.answer += str(e)
chunk = self.base_to_response.to_stream_chunk_response(self.params['chat_id'],
self.params['chat_record_id'],
current_node.id,
Expand All @@ -524,6 +524,7 @@ def hand_event_node_result(self, current_node, node_result_future):
node_chunk.end(chunk)
current_node.get_write_error_context(e)
self.status = 500
return None

def run_node_async(self, node):
future = executor.submit(self.run_node, node)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few issues and optimizations to suggest in the provided code:

Issues:

  1. Thread Lock: The threading.Lock is not used consistently throughout the class. It's better placed at places where critical sections of code need locking, but here it seems unnecessary since there aren't any multi-threaded operations being performed.

  2. Return Statements: There are multiple instances where lists are returned instead of single values ([] and None). This might lead to unexpected behavior when calling functions that expect only one value.

  3. Unnecessary Exception Logging: The exception print statement (traceback.print_exc()) should be wrapped within try-except blocks to prevent the program from crashing if an exception occurs.

  4. Error Handling in Event Nodes: When processing event nodes, there's some error handling in place, but it doesn't seem comprehensive. The response handling can be improved to provide more meaningful information back to the caller.

  5. Node Management: Some methods like append_node, base_to_response, get_write_error_context have no implementation, which could cause runtime errors if called without further refinement.

Optimization Suggestions:

  1. Consistent Return Values: Ensure consistent return values. Instead of returning different types (e.g., list, None), decide on a clear structure for all return values.

  2. Handle Exceptions Properly: Wrap exception logging in try-except blocks to avoid crashing the entire pipeline.

  3. Implement Missing Methods: Fill in the implementations for missing methods such as append_node, base_to_response, and get_write_error_context.

Here's an updated version with these considerations incorporated:

from concurrent.futures import ThreadPoolExecutor

class ChainManager:
    def __init__(self, flow: Flow, params, work_flow_post_handler: WorkFlowPostHandler):
        self.audio_list = audio_list
        self.params = params
        self.flow = flow
        self.context = {}
        self.node_chunk_manage = NodeChunkManage(self)
        self.work_flow_post_handler = work_flow_post_handler
        self.executor = ThreadPoolExecutor()

    def run_chain_manage(self, current_node, node_result_future):
        start_node = self.get_start_node()
        while current_node != start_node:
            current_node = get_node(current_node.type)(current_node, self.params, self)
        result = self.run_chain(current_node)
        if result is None:
            return
        
        if isinstance(result, Exception):
            self.hand_event_node_result(current_node, result)
            return
    
        node_list = self.get_next_node_list(current_node, result)
        if len(node_list) == 1:
            self.run_chain_manage(node_list[0], None)

    def run_chain(self, current_node, node_result_future=None):
        result = None
        try:
            if node_result_future is not None:
                node_result_future.set_result(result)
            
            # Assuming run_node takes care of setting result appropriately
            result = self.run_node(current_node)
            
        except Exception as e:
            traceback.print_exc()
            # Handle exceptions properly here if necessary
            pass
        
        return result

    def hand_node_result(self, current_node, node_result_future):
        try:
            # Implement actual logic to handle node results here
            pass
        except Exception as e:
            traceback.print_exc()
            # Log or otherwise report the error
            raise
    
    def hand_event_node_result(self, current_node, node_result_future):
        try:
            # Implement actual logic to handle event node results here
            traceback.print_exc()
            # Optionally, set specific status codes or messages based on response data
            self.status = 500
            raise
        except Exception as e:
            traceback.print_exc()
            # Log or otherwise report the error
            raise

    def run_node_async(self, node):
        try:
            future = self.executor.submit(self.run_node, node)
            return future.result()
        except Exception as e:
            traceback.print_exc()

This version removes the unused lock and improves exception handling by using try-except blocks. It also provides basic placeholders for the missing methods, ensuring they will behave predictably once implemented.

Expand Down
Loading