Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize database usage #11886

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,25 @@ def _generate_worker(
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)

workflow = (
db.session.query(Workflow)
.filter(
Workflow.tenant_id == application_generate_entity.app_config.tenant_id,
Workflow.app_id == application_generate_entity.app_config.app_id,
Workflow.id == application_generate_entity.app_config.workflow_id,
)
.first()
)
if not workflow:
raise ValueError("Workflow not initialized")

# chatbot app
runner = AdvancedChatAppRunner(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
workflow=workflow,
dialogue_count=self._dialogue_count,
)

Expand Down
60 changes: 24 additions & 36 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from core.workflow.enums import SystemVariableKey
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models import Workflow
from models.enums import UserFrom
from models.model import App, Conversation, EndUser, Message
from models.model import Conversation, Message
from models.workflow import ConversationVariable, WorkflowType

logger = logging.getLogger(__name__)
Expand All @@ -35,38 +36,24 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):

def __init__(
self,
*,
application_generate_entity: AdvancedChatAppGenerateEntity,
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
dialogue_count: int,
workflow: Workflow,
) -> None:
super().__init__(queue_manager)
super().__init__(queue_manager=queue_manager)

self.application_generate_entity = application_generate_entity
self.conversation = conversation
self.message = message
self._dialogue_count = dialogue_count
self.workflow = workflow

def run(self) -> None:
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)

app_record = db.session.query(App).filter(App.id == app_config.app_id).first()
if not app_record:
raise ValueError("App not found")

workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow:
raise ValueError("Workflow not initialized")

user_id = None
if self.application_generate_entity.invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}:
end_user = db.session.query(EndUser).filter(EndUser.id == self.application_generate_entity.user_id).first()
if end_user:
user_id = end_user.session_id
else:
user_id = self.application_generate_entity.user_id
app_config = cast(AdvancedChatAppConfig, self.application_generate_entity.app_config)

workflow_callbacks: list[WorkflowCallback] = []
if dify_config.DEBUG:
Expand All @@ -75,7 +62,7 @@ def run(self) -> None:
if self.application_generate_entity.single_iteration_run:
# if only single iteration run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
workflow=workflow,
workflow=self.workflow,
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=self.application_generate_entity.single_iteration_run.inputs,
)
Expand All @@ -86,7 +73,6 @@ def run(self) -> None:

# moderation
if self.handle_input_moderation(
app_record=app_record,
app_generate_entity=self.application_generate_entity,
inputs=inputs,
query=query,
Expand All @@ -96,7 +82,6 @@ def run(self) -> None:

# annotation reply
if self.handle_annotation_reply(
app_record=app_record,
message=self.message,
query=query,
app_generate_entity=self.application_generate_entity,
Expand All @@ -116,7 +101,7 @@ def run(self) -> None:
ConversationVariable.from_variable(
app_id=self.conversation.app_id, conversation_id=self.conversation.id, variable=variable
)
for variable in workflow.conversation_variables
for variable in self.workflow.conversation_variables
]
session.add_all(conversation_variables)
# Convert database entities to variables.
Expand All @@ -129,7 +114,7 @@ def run(self) -> None:
SystemVariableKey.QUERY: query,
SystemVariableKey.FILES: files,
SystemVariableKey.CONVERSATION_ID: self.conversation.id,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.USER_ID: self.application_generate_entity.user_id,
SystemVariableKey.DIALOGUE_COUNT: self._dialogue_count,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
Expand All @@ -140,23 +125,23 @@ def run(self) -> None:
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=inputs,
environment_variables=workflow.environment_variables,
environment_variables=self.workflow.environment_variables,
conversation_variables=conversation_variables,
)

# init graph
graph = self._init_graph(graph_config=workflow.graph_dict)
graph = self._init_graph(graph_config=self.workflow.graph_dict)

db.session.close()

# RUN WORKFLOW
workflow_entry = WorkflowEntry(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
workflow_type=WorkflowType.value_of(workflow.type),
tenant_id=self.workflow.tenant_id,
app_id=self.workflow.app_id,
workflow_id=self.workflow.id,
workflow_type=WorkflowType.value_of(self.workflow.type),
graph=graph,
graph_config=workflow.graph_dict,
graph_config=self.workflow.graph_dict,
user_id=self.application_generate_entity.user_id,
user_from=(
UserFrom.ACCOUNT
Expand All @@ -177,7 +162,6 @@ def run(self) -> None:

def handle_input_moderation(
self,
app_record: App,
app_generate_entity: AdvancedChatAppGenerateEntity,
inputs: Mapping[str, Any],
query: str,
Expand All @@ -186,7 +170,7 @@ def handle_input_moderation(
try:
# process sensitive_word_avoidance
_, inputs, query = self.moderation_for_inputs(
app_id=app_record.id,
app_id=app_generate_entity.app_config.app_id,
tenant_id=app_generate_entity.app_config.tenant_id,
app_generate_entity=app_generate_entity,
inputs=inputs,
Expand All @@ -200,10 +184,14 @@ def handle_input_moderation(
return False

def handle_annotation_reply(
self, app_record: App, message: Message, query: str, app_generate_entity: AdvancedChatAppGenerateEntity
self,
message: Message,
query: str,
app_generate_entity: AdvancedChatAppGenerateEntity,
) -> bool:
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
app_id=app_generate_entity.app_config.app_id,
tenant_id=app_generate_entity.app_config.tenant_id,
message=message,
query=query,
user_id=app_generate_entity.user_id,
Expand Down
3 changes: 2 additions & 1 deletion api/core/app/apps/agent_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def run(
if query:
# annotation reply
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
app_id=app_record.id,
tenant_id=app_config.tenant_id,
message=message,
query=query,
user_id=application_generate_entity.user_id,
Expand Down
9 changes: 7 additions & 2 deletions api/core/app/apps/base_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def fill_in_inputs_from_external_data_tools(
)

def query_app_annotations_to_reply(
self, app_record: App, message: Message, query: str, user_id: str, invoke_from: InvokeFrom
self, app_id: str, tenant_id: str, message: Message, query: str, user_id: str, invoke_from: InvokeFrom
) -> Optional[MessageAnnotation]:
"""
Query app annotations to reply
Expand All @@ -422,5 +422,10 @@ def query_app_annotations_to_reply(
"""
annotation_reply_feature = AnnotationReplyFeature()
return annotation_reply_feature.query(
app_record=app_record, message=message, query=query, user_id=user_id, invoke_from=invoke_from
app_id=app_id,
tenant_id=tenant_id,
message=message,
query=query,
user_id=user_id,
invoke_from=invoke_from,
)
3 changes: 2 additions & 1 deletion api/core/app/apps/chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def run(
if query:
# annotation reply
annotation_reply = self.query_app_annotations_to_reply(
app_record=app_record,
app_id=app_record.id,
tenant_id=app_config.tenant_id,
message=message,
query=query,
user_id=application_generate_entity.user_id,
Expand Down
13 changes: 12 additions & 1 deletion api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,22 @@ def _generate_worker(
var.set(val)
with flask_app.app_context():
try:
# workflow app
workflow = (
db.session.query(Workflow)
.filter(
Workflow.tenant_id == application_generate_entity.app_config.tenant_id,
Workflow.app_id == application_generate_entity.app_config.app_id,
Workflow.id == application_generate_entity.app_config.workflow_id,
)
.first()
)
if not workflow:
raise ValueError("Workflow not initialized")
runner = WorkflowAppRunner(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
workflow_thread_pool_id=workflow_thread_pool_id,
workflow=workflow,
)

runner.run()
Expand Down
53 changes: 16 additions & 37 deletions api/core/app/apps/workflow/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models import Workflow
from models.enums import UserFrom
from models.model import App, EndUser
from models.workflow import WorkflowType

logger = logging.getLogger(__name__)
Expand All @@ -28,18 +27,17 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):

def __init__(
self,
*,
application_generate_entity: WorkflowAppGenerateEntity,
queue_manager: AppQueueManager,
workflow_thread_pool_id: Optional[str] = None,
workflow: Workflow,
) -> None:
"""
:param application_generate_entity: application generate entity
:param queue_manager: application queue manager
:param workflow_thread_pool_id: workflow thread pool id
"""
super().__init__(queue_manager=queue_manager)

self.application_generate_entity = application_generate_entity
self.queue_manager = queue_manager
self.workflow_thread_pool_id = workflow_thread_pool_id
self.workflow = workflow

def run(self) -> None:
"""
Expand All @@ -48,26 +46,7 @@ def run(self) -> None:
:param queue_manager: application queue manager
:return:
"""
app_config = self.application_generate_entity.app_config
app_config = cast(WorkflowAppConfig, app_config)

user_id = None
if self.application_generate_entity.invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}:
end_user = db.session.query(EndUser).filter(EndUser.id == self.application_generate_entity.user_id).first()
if end_user:
user_id = end_user.session_id
else:
user_id = self.application_generate_entity.user_id

app_record = db.session.query(App).filter(App.id == app_config.app_id).first()
if not app_record:
raise ValueError("App not found")

workflow = self.get_workflow(app_model=app_record, workflow_id=app_config.workflow_id)
if not workflow:
raise ValueError("Workflow not initialized")

db.session.close()
app_config = cast(WorkflowAppConfig, self.application_generate_entity.app_config)

workflow_callbacks: list[WorkflowCallback] = []
if dify_config.DEBUG:
Expand All @@ -77,7 +56,7 @@ def run(self) -> None:
if self.application_generate_entity.single_iteration_run:
# if only single iteration run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
workflow=workflow,
workflow=self.workflow,
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=self.application_generate_entity.single_iteration_run.inputs,
)
Expand All @@ -88,7 +67,7 @@ def run(self) -> None:
# Create a variable pool.
system_inputs = {
SystemVariableKey.FILES: files,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.USER_ID: self.application_generate_entity.user_id,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_run_id,
Expand All @@ -97,21 +76,21 @@ def run(self) -> None:
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=inputs,
environment_variables=workflow.environment_variables,
environment_variables=self.workflow.environment_variables,
conversation_variables=[],
)

# init graph
graph = self._init_graph(graph_config=workflow.graph_dict)
graph = self._init_graph(graph_config=self.workflow.graph_dict)

# RUN WORKFLOW
workflow_entry = WorkflowEntry(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
workflow_type=WorkflowType.value_of(workflow.type),
tenant_id=self.workflow.tenant_id,
app_id=self.workflow.app_id,
workflow_id=self.workflow.id,
workflow_type=WorkflowType.value_of(self.workflow.type),
graph=graph,
graph_config=workflow.graph_dict,
graph_config=self.workflow.graph_dict,
user_id=self.application_generate_entity.user_id,
user_from=(
UserFrom.ACCOUNT
Expand Down
Loading
Loading