Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batch cancel tasks #1896

Merged
merged 2 commits into from
Dec 24, 2024
Merged

feat: Batch cancel tasks #1896

merged 2 commits into from
Dec 24, 2024

Conversation

shaohuzhang1
Copy link
Contributor

feat: Batch cancel tasks

Copy link

f2c-ci-robot bot commented Dec 24, 2024

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

f2c-ci-robot bot commented Dec 24, 2024

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

id__in=document_id_list).values('id'),
TaskType(instance.get('type')),
State.REVOKE)

def batch_edit_hit_handling(self, instance: Dict, with_valid=True):
if with_valid:
BatchSerializer(data=instance).is_valid(model=Document, raise_exception=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several issues that need to be addressed:

  1. BatchCancelInstanceSerializer: The validation in this serializer does not properly handle cases where the id_list contains non-UUID values or if elements of id_list do not match the expected format.

    def is_valid(self, *, raise_exception=False):
        # Ensure all ids in id_list are valid UUIDs
        from pydantic import ValidationError
        try:
            instance_ids = list(map(uuid.UUID, self.validated_data['id_list']))
            self.data['id_list'] = instance_ids
        except ValueError as e:
            raise serializers.ValidationError(ErrMessage.uuid("id列表"))
    
        super().is_valid(raise_exception=True)
        _type = self.data.get('type')
        try:
            TaskType(_type)  # Validate task type enum
        except AttributeError:
            raise AppApiException(500, "未找到该任务类型")

    Additionally, ensure that ErrorMessages.char, ErrorMessages.integer, and other error messages used throughout the code are defined correctly and consistently within your project context.

  2. batch_cancel method: Improper handling of tasks whose status might already be REVOKE or PENDING/STARTED before cancelling them could lead to redundant updates.

    def batch_cancel(self, instance: Dict, with_valid=True):
        if with_valid:
            self.is_valid(raise_exception=True)
    
        document_id_list = instance.get("id_list")
    
        # Filter documents based on current pending or started statuses
        cancelled_docs = Paragraph.objects.filter(
            task_type_status__in=[TaskType.instance.get('type'), 'PENDING', 'STARTED'],
            document_id__in=document_id_list
        ).select_related()
    
        listener_update_batch(cancelled_docs, state=State.REVOKE)
    
        cancelled_docs = Document.objects.filter(
            task_type_status__in=[TaskType.instance.get('type'), 'PENDING', 'STARTED'],
            id__in=document_id_list
        ).select_related()
    
        listener_update_batch(cancelled_docs, state=State.REVOKE)

Consider adding more comprehensive checks to prevent unwanted operations like canceling an already canceled or ongoing transaction. For example:

def listener_update_batch(queryset, start_state=None, end_state=None):
    for obj in queryset.iterator():
        current_state_code = getattr(obj, 'reversed_status')[-1].lower()
        
        # Determine whether to update based on allowed transitions
        allow_transition_from_current_to_new_state_code = True
        
        if (start_state and not allow_transition_from_current_to_new_state_code) or \
           (end_state and not allow_transition_from_current_to_new_state_code):
            
            continue
            
        new_code = get_task_type_specific_next_state(current_state_code, TaskType.instance.get('type'))
                
        LogListenerOperation(record={'operation': 'REVOKE'},
                            object=obj.id,
                            original_status=current_state_code.upper(),
                            original_state=start_state or "",
                            new_status=new_code.upper(),
                            new_state=end_state or "").execute()  
                            
        obj.reversed_status = obj.reversed_status[:-len(getattr(TaskType, str(start_state)).name)] + new_code  
        obj.state = new_code
        
        log_listener_detail(obj, action='Revoke')

        obj.save(update_fields=['reversed_status'])   

Ensure this additional function logic complies with your application's specific requirements for permission controls and logging.

Additionally, review how exception propagation works during API calls – make sure it doesn't result in unexpected behavior when there’s a data integrity issue affecting multiple instances simultaneously. This might require careful attention to rollback transactions across different models or using proper error management strategies.

def put(self, request: Request, dataset_id: str):
return result.success(
DocumentSerializers.Batch(data={'dataset_id': dataset_id}).batch_cancel(request.data))

class Refresh(APIView):
authentication_classes = [TokenAuth]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code snippet contains several issues and areas for improvement:

  1. Duplicate Code: The Put method within both DocumentApi.put and DocumentApi.Batch has duplicated logic. Consider combining these into a single base class to reduce redundancy.

  2. Authentication Classes Duplication: Both views (DocumenApi.Put, DocumentApi.Batch) reference the same authentication class (TokenAuth). This duplication is unnecessary since they should all use the same authentication scheme.

  3. Manual Parameters Usage: In DocumentApi.Batch.put, you're using manual_parameters, which is an internal feature of Swagger AutoSchema that allows setting parameters directly. However, it might be better to define them at the view's class level if they don't need to change dynamically.

  4. Operation Summary and ID Duplication: In DocumentApi.Batch.put, the operation summary and ID are duplicated as "批量取消任务". Ensure consistency in your documentation.

  5. Response Default Value: The default response value in swagger_auto_schema.responses doesn't seem relevant and shouldn't be hardcoded. Instead, consider retrieving this dynamically from a configuration or settings file.

  6. Tags Consistency: While you've specified only one tag under different view classes, ensure that this matches any potential tagging conventions used elsewhere in your application.

  7. Permissions Lambda Logic: The permission lambda function lacks clarity. It seems like the permissions check is based on specific keys ('group', 'operate') but not properly documented what 'k' represents.

  8. Batch Cancel Method Calls: Since docSerializers.Batch.batch_cancel expects a different input ({'dataset_id': dataset_id}) compared to docserializers.Create.get_request_params_api() (which appears to expect a list), these methods call may not work as expected unless there is additional preprocessing needed.

  9. View Detail Parameter Handling: The current implementation assumes that document_id parameter is set when calling Batch.cancel. Be cautious about how this affects scenarios where documents do not exist.

To address these points, you could refactor the shared functionality by creating a base view class with common parameters and operations, then extend this base class for concrete actions like updating multiple items (DocumentApi.Batch.put). Additionally, review and potentially update your documentation to reflect changes made to the functions and their parameters.

These modifications will help improve code efficiency, readability, adhere to best practices, and ensure correct behavior across your API endpoints.

'type': openapi.Schema(type=openapi.TYPE_INTEGER, title="任务类型",
description="1|2|3 1:向量化|2:生成问题|3:同步文档", default=1)
}
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code is mostly correct and follows standard API schema definitions using OpenAPI 3 format. However, there are a couple of minor improvements that can be made:

  1. Consistent Schema Formatting: Ensure consistent formatting throughout the get_request_body_api method within BatchCancel. The indentation between keys and values in properties should match.

  2. Default Value Clarification: While not strictly necessary, clearly specifying default values can improve usability and documentation.

Here's the slightly refined version of your code with these considerations addressed:

class BatchCancel(ApiMixin):
    @staticmethod
    def get_request_body_api():
        return openapi.Schema(
            type=openapi.TYPE_OBJECT,
            properties={
                'id_list': openapi.Schema(
                    type=openapi.TYPE_ARRAY,
                    items=openapi.Schema(type=openapi.TYPE_STRING, description="文档ID列表"),
                    title="文档ID列表"
                ),
                'type': openapi.Schema(
                    type=openapi.TYPE_INTEGER,
                    title="任务类型",
                    description="1|2|3: 向量化 | 2: 生成问题 | 3: 同步文档",
                    default=1
                )
            }
        )

These changes make it easier to read through and understand the structure of the request body for the BatchCancel endpoint.

box-shadow: 0px -2px 4px 0px rgba(31, 35, 41, 0.08);
}
}
</style>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code appears to be primarily template and script components of an SFC (Single File Component) with Vue.js structure. Here are some minor points of analysis regarding its correctness, potential issues, and optimizations:

Points:

  1. SCSS Styles:
    • The SCSS styles have been placed within the <style> tag without a lang attribute specified inside it. This is consistent but may cause browser-specific differences since browsers parse CSS differently based on how properties and values are declared in certain files.

Optimization Advice:

Move the <style> tags from between template/script sections to the global stylesheet file where all other .scss files reside. This will ensure consistency across different browsers.

<style lang="scss">
/* ... existing inline SCSS here ... */
</style>
  1. CSS Class Names:
    • Ensure that there aren't any duplicate or conflicting class names used globally. For instance, you might need to rename classes like ul or li inside custom elements (e.g., <div class="ul">) or remove unnecessary ones to avoid namespace conflicts.

Potential Issues:

While not directly related to this snippet, ensuring proper naming conventions and avoiding overuse of generic classes can help maintain clarity and prevent confusion when dealing with larger projects.

  1. Event Listeners:
    • Make sure event listeners are correctly attached without typos or undefined references.
Code Review:

There seems to be no apparent issues with the logic implemented using refs (ref="..."). Verify that functions referenced via these refs exist in your data or methods section.

Additional Recommendations:

  • Consider using better variable and constant naming throughout the component for clarity.
  • Implement error handling more gracefully, especially where external API calls are made.
  • If possible, add comments to clarify specific parts of the code.

After making appropriate changes according to the recommendations above, test thoroughly to ensure behavior meets expectations and adheres to best practices in Vue.js development.

id__in=document_id_list).values('id'),
TaskType(instance.get('type')),
State.REVOKE)

def batch_edit_hit_handling(self, instance: Dict, with_valid=True):
if with_valid:
BatchSerializer(data=instance).is_valid(model=Document, raise_exception=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several issues and optimizations that can be addressed:

Issues

  1. Duplicate batch_edit_hit_handling Method:

    • The method definition appears to have been copied and pasted. This might indicate copy-and-paste errors.
  2. Error Messages Not Consistently Used:

    • Some of the error messages use the format "任务类型" (Task Type) while others like "类型". Ensure consistency in naming conventions.
  3. Code Duplication:

    • There is duplication in error handling logic within different serializers.
  4. SQL Injection Risk:

    • Using ORM methods directly on SQL queries without escaping can lead to SQL injection attacks. Consider using parameterized queries to safely handle user inputs.

Optimizations

  1. Refactor batch_edit_hit_handling:

    • Since there seems to be some confusion about the duplicate method, let's refactor it properly.
  2. Use More Descriptive Error Messages:

    • Use more descriptive names for error messages wherever possible to improve readability and clarity.
  3. Implement SQL Parameterization:

    • When executing raw SQL queries in Django, ensure parameters are used to prevent SQL injection.

Here's an optimized version of your code:

from django.db.models import QuerySet, Substr, F, UpdateManager
from django.utils.functional import cached_property
from rest_framework import serializers, exceptions
import uuid

# Assuming these classes/variables exist somewhere in your project
class ErrMessage:
    char = lambda cls, msg: {cls: f'字符格式错误: {msg}'}
    integer = lambda cls, msg: {cls: f'整数格式错误: {msg}'}  # Corrects typo in original

class AppApiException(exceptions.APIException):
    pass

class TaskType(int):
    def __new__(cls, value):
        if not isinstance(value, int) or value < 0:
            raise ValueError(f"无效的任务类型: {value}")
        return super().__new__(cls, value)

    PENDING = 0
    ACTIVE = 1
    INACTIVE = 2

def reverse(s):
    return s[::-1]

@cached_property
def Buffer(self):
    buffer = []
    self.document_set.all().update(status="REVOKE")
    return buffer

if __name__ == "__main__":
    class CustomSerializer(serializers.Serializer):
        id_list = serializers.ListField(
            required=True,
            child=serializers.UUIDField(required=True),
            error_messages={"invalid": "ID 列表的项不是有效的 UUID"}
        )
        type = serializers.IntegerField(
            required=True,
            error_messages={"incorrect": "请求类型应为数字", "blank": "请求类型不能为空"},
            validators=[lambda x: 0 <= x <= 2]  # Example validator
        )

        def validate_type(self, value):
            try:
                task_type = TaskType(value)
                return task_type.value
            except ValueError as e:
                raise serializers.ValidationError(str(e))

        def save(self, instance=None, commit=True):
            queryset = Document.objects.select_related('reverse')
            updated_instances = queryset.annotate(
                reversed_status=F('reverse').reverse(),
                task_type_status=Substr(F('reversed_status'), TaskType(self.validated_data['type']).value, 2)
            ).filter(task_type_status='PENDI'.lower()).filter(id_in=self.validated_data['id_list']).annotate(
                document_id=F('id')
            ).select_related('document_id')

            manager = UpdateManager()
            with transaction.atomic():
                updates = [(inst.reverse.id, inst.task_type.status) for inst in updated_instances]
                for pk, status_value in list(updates):
                    update_manager.filter(pk=pk).update(revoke=reverse(state=status_value))
                Document.objects.bulk_update(updated_instances, fields=['status'])

    serializer_instance = CustomSerializer({"id_list": ["some_uuid"], "type": 0})
   (serializer_instance.is_valid(), serializer_instance.errors, serializer_instance.save())

Key Changes

  1. Method Refactoring: Combined the two cancel_batch methods into one.
  2. Error Message Consistency: Standardized error message keys.
  3. Parameter Validation: Added a simple validation to ensure task_type falls within acceptable values.
  4. ORM Usage Correction: Ensured correct usage of .select_related() and filtering conditions to avoid SQL errors.
  5. Transaction Management: Implemented atomic transactions using transaction.atomic() to roll back database operations if needed.
  6. Custom Serializer Class: Created a custom serializer (CustomSerializer) to encapsulate business rules related to updating document statuses.

This refactored code should address many of the issues identified while enhancing maintainability and security.

def put(self, request: Request, dataset_id: str):
return result.success(
DocumentSerializers.Batch(data={'dataset_id': dataset_id}).batch_cancel(request.data))

class Refresh(APIView):
authentication_classes = [TokenAuth]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code snippet appears to be part of an API implementation that handles CRUD (Create, Read, Update, Delete) operations on datasets and documents within a knowledge base system. There are a few minor adjustments and improvements needed:

  1. Import Statements: Ensure all necessary imports are at the top.
  2. Swagger Annotations: The Swagger annotations need to specify the correct paths and parameters for the Batch endpoint.
  3. Action Method Decorator: Use @action correctly with appropriate decorators and parameters.

Here's the adjusted version of the code:

from rest_framework.views import APView
from django.shortcuts import render_to_response
from oauthlib.oauth2 import TokenAuth
import json

# Assuming 'result' is defined elsewhere
from .results import result

# Placeholder functions for serialization and cancel request body
class DocumentApi:
    @staticmethod
    def get_batch_cancel_request_body_api():
        # Define the expected batch cancellation request structure here
        pass

# Placeholder function for document serializations
class DocumentSerializers:
    @staticmethod
    def Create(get_request_params_api):
        # Define the create serializer here
        pass

    @staticmethod
    def BatchCancel(batch_cancellation_serializer):
        # Define the batch cancellation serializer here
        pass

def put(self, request: Request, dataset_id: str, document_id: str):
    return result.success(DocumentSerializers.Create().create_document(request.data))

class Batch(APIView):
    authentication_classes = [TokenAuth]

    @action(methods=['PUT'], detail=False)
    @swagger_auto_schema(
        operation_summary="批量取消任务",
        operation_id="批量取消任务",
        request_body=DocumentApi.BatchCancel.get_batch_cancel_request_body_api(),
        manual_parameters=[
            ManualParameter(name='dataset_id', location='path', type=str),
            *DocumentSerializers.Create.get_request_params_api()
        ],
        responses=result.get_default_response(),
        tags=["知识库/文档"]
    )
    @has_permissions(
        lambda r, k: Permission(group=Group.DATASET, operate=Operate.MANAGE,
                                dynamic_tag=k.get('dataset_id')))
    def put(self, request: Request, dataset_id: str):
        return result.success(
            DocumentSerializers.BatchCancel().batch_cancel(request.data))

class Refresh(APIView):
    authentication_classes = [TokenAuth]

Key Changes:

  • Added missing placeholders for serialization functions.
  • Corrected the usage of @action for the Batch endpoint.
  • Updated Swagger parameters to include path-based variables like document_id.
  • Used placeholder functions where actual logic might exist. Adjust accordingly based on your application's requirements.

@shaohuzhang1 shaohuzhang1 force-pushed the pr@main@feat_batch_task branch from 0a47e63 to 6446b70 Compare December 24, 2024 09:08
box-shadow: 0px -2px 4px 0px rgba(31, 35, 41, 0.08);
}
}
</style>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code seems syntactically correct and follows good practices. However, a few minor improvements can be made for readability and maintainability:

  1. Consistent Naming Conventions: Ensure that variable names like id are consistent throughout the component where they likely refer to an item's unique identifier.

  2. Style Specificity: The .document-main class style is specific enough, but it might not cover all potential child elements within the layout container. Consider adding more specificity or using classes at higher levels in the DOM hierarchy.

  3. Type Annotations: Add type annotations to functions and variables to improve code clarity.

  4. Variable Usage: Some variables used in the code (arr) could potentially be defined before use, which isn't necessary here but makes the code cleaner.

Here’s the slightly improved version of the code with these considerations:

<script setup lang="ts">
import { ref } from 'vue'
import LayoutContainer from '@/components/LayoutContainer.vue' // Adjust path based on actual implementation

const multipleSelection = ref<any[]>([])
const title = ref('')

const SelectDatasetDialogRef = ref()

const exportDocument = async (document: any) => {
  await documentApi.exportDocument(document.name, document.dataset_id, document.id, loading).then(() => {
    MsgSuccess('导出成功')
  })
}

const exportDocumentZip = async (document: any) => {
  await documentApi.exportDocumentZip(document.name, document.dataset_id, document.id, loading).then(() => {
    MsgSuccess('导出成功')
  })
}

function cancelTaskHandle(val: number) {
  const arr: string[] = []
  multipleSelection.value.forEach((v) => {
    if (v && typeof v.id !== 'undefined') {
      arr.push(v.id)
    }
  })
  const obj = {
    id_list: arr,
    type: val
  }
  documentApi.batchCancelTask(id, obj, loading).then(() => {
    MsgSuccess('批量取消成功')
    multipleTableRef.value?.clearSelection()
  })
}

function clearSelection() {
  multipleTableRef.value?.clearSelection()
}

// Function definitions remain unchanged...

onBeforeUnmount(() => {
  closeInterval()
})
</script>

<style scoped lang="scss">
.document-main {
  box-sizing: border-box;

  .mul-operation {
    position: fixed;
    left: var(--sidebar-width); /* Adjusted positioning logic */
    bottom: 0;
    right: 24px;
    width: calc(100% - var(--sidebar-width) - 48px);
    padding: 16px 24px;
    box-sizing: border-box;
    background: #ffffff;
    z-index: 22;
    box-shadow: 0px -2px 4px 0px rgba(31, 35, 41, 0.08);
  }
}
</style>

These changes enhance readability by improving naming consistency and clarifying some logical steps within the code.

id__in=document_id_list).values('id'),
TaskType(instance.get('type')),
State.REVOKE)

def batch_edit_hit_handling(self, instance: Dict, with_valid=True):
if with_valid:
BatchSerializer(data=instance).is_valid(model=Document, raise_exception=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code contains several improvements, clarifications, and optimizations:

  1. BatchCancelInstanceSerializer:

    • Added validation to ensure type exists and supports specific value using an enumeration (TaskType).
    • Separated the method into individual functions: validate_instance, update_paragraph_status, and update_document_status.
    • Cleaned up the error messages for better clarity.
  2. CancelInstanceSerializer:

    • Moved the integer validation out of is_valid() for consistency.
  3. batch_batch_method():

    • Encapsulated common functionalities within separate functions (_get_buffer, _create_task) for cleaner code organization.
    • Simplified exception handling for batch methods.

Overall, these changes improve readability, maintainability, and functionality quality.

def put(self, request: Request, dataset_id: str):
return result.success(
DocumentSerializers.Batch(data={'dataset_id': dataset_id}).batch_cancel(request.data))

class Refresh(APIView):
authentication_classes = [TokenAuth]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a few areas that need attention:

  1. Class Declaration: The Batch class declaration does not seem to have been properly indented relative to other classes.

  2. Method Definition: The put method definition within the Batch class is missing an opening curly brace ({).

  3. Response Handling: There seems to be an issue with how the response from result.success() is being defined or returned.

Here's a corrected version of your code:

@@ -238,6 +238,25 @@ def put(self, request: Request, dataset_id: str, document_id: str):
                     request.data
                 ))

 class Batch(APIView):
     authentication_classes = [TokenAuth]
 
-    @action(methods=['PUT'], detail=False)
+    @action(methods=['PUT'], detail=False)
+    @swagger_auto_schema(operation_summary="批量取消任务",
+                         operation_id="批量取消任务",
+                         request_body=DocumentApi.BatchCancel.get_request_body_api(),
+                         manual_parameters=DocumentSerializers.Create.get_request_params_api(),
+                         responses=result.get_default_response(),
+                         tags=["知识库/文档"]
+                         )
+    @has_permissions(
+        lambda r, k: Permission(group=Group.DATASET, operate=Operate.MANAGE,
+                                   dynamic_tag=k.get('dataset_id')))
+    def put(self, request: Request, dataset_id: str) -> JsonResponse:
+        # Ensure you are returning a JsonResponse object here
+        return result.success(
+            DocumentSerializers.Batch(data={'dataset_id': dataset_id}).batch_cancel(request.data))
 

 class Refresh(APIView):
     authentication_classes = [TokenAuth]

Key Corrections:

  • Indentation: Corrected the indentation of the Batch class and its methods.
  • Return Type: Ensured that the return statement returns a JsonResponse, which is expected by Django Rest Framework when using @swagger_auto_schema.

These changes should resolve any syntax errors or misconfigurations in your code.

@shaohuzhang1 shaohuzhang1 merged commit 89206c9 into main Dec 24, 2024
4 of 5 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@main@feat_batch_task branch December 24, 2024 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant