Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add URI generator, resolver and corresponding document open & create functions #3211

Merged
merged 16 commits into from
Jan 15, 2025

Conversation

bobbai00
Copy link
Collaborator

@bobbai00 bobbai00 commented Jan 14, 2025

This PR refactor the storage key by representing it using the storage URI. And this PR also adds the openDocument and createDocument function using the given URI.

Major Changes

1. VFS URI resource type definition, resolve and decode functions

Two types of VFS resources are defined:

object VFSResourceType extends Enumeration {
  val RESULT: Value = Value("result")
  val MATERIALIZED_RESULT: Value = Value("materializedResult")
}

Two defs are added to the FileResolver

  • resolve: create the URI pointing to the storage resource on the VFS
/**
    * Resolve a VFS resource to its URI. The URI can be used by the DocumentFactory to create resource or open resource
    *
    * @param resourceType   The type of the VFS resource.
    * @param workflowId     Workflow identifier.
    * @param executionId    Execution identifier.
    * @param operatorId     Operator identifier.
    * @param portIdentity   Optional port identifier. **Required** if `resourceType` is `RESULT` or `MATERIALIZED_RESULT`.
    * @return A VFS URI
    * @throws IllegalArgumentException if `resourceType` is `RESULT` but `portIdentity` is missing.
    */
  def resolve(
      resourceType: VFSResourceType.Value,
      workflowId: WorkflowIdentity,
      executionId: ExecutionIdentity,
      operatorId: OperatorIdentity,
      portIdentity: Option[PortIdentity] = None
  ): URI
  • decodeVFSUri: decode a VFS URI to components
  /**
    * Parses a VFS URI and extracts its components
    *
    * @param uri The VFS URI to parse.
    * @return A `VFSUriComponents` object with the extracted data.
    * @throws IllegalArgumentException if the URI is malformed.
    */
  def decodeVFSUri(uri: URI): (
      WorkflowIdentity,
      ExecutionIdentity,
      OperatorIdentity,
      Option[PortIdentity],
      VFSResourceType.Value
  )

2. createDocument and openDocument functions to the DocumentFactory

createDocument and openDocument defs to create/open a storage resource pointed by the URI

  • DocumentFactory.createDocument
  /**
    * Create a document for storage specified by the uri.
    * This document is suitable for storing structural data, i.e. the schema is required to create such document.
    * @param uri the location of the document
    * @param schema the schema of the data stored in the document
    * @return the created document
    */
  def createDocument(uri: URI, schema: Schema): VirtualDocument[_]
  • DocumentFactory.openDocument
  /**
    * Open a document specified by the uri.
    * The document should be storing the structural data as the document and the schema will be returned
    * @param uri the uri of the document
    * @return the VirtualDocument, which is the handler of the data; the Schema, which is the schema of the data stored in the document
    */
  def openDocument(uri: URI): (VirtualDocument[_], Schema)

@bobbai00 bobbai00 force-pushed the jiadong-add-uri-generator-and-doc-factory branch from 7d406eb to 0f9d636 Compare January 14, 2025 03:49
@bobbai00 bobbai00 self-assigned this Jan 14, 2025
Copy link
Collaborator

@shengquan-ni shengquan-ni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left minor comments.

@bobbai00 bobbai00 added the refactoring Refactor the code label Jan 14, 2025
@bobbai00 bobbai00 force-pushed the jiadong-add-uri-generator-and-doc-factory branch from 92e7a04 to 136f9a6 Compare January 14, 2025 22:04
@bobbai00 bobbai00 force-pushed the jiadong-add-uri-generator-and-doc-factory branch from f0bc26c to 6d56d81 Compare January 15, 2025 00:43
@bobbai00 bobbai00 merged commit d4d4176 into master Jan 15, 2025
8 checks passed
@bobbai00 bobbai00 deleted the jiadong-add-uri-generator-and-doc-factory branch January 15, 2025 01:19
Copy link
Collaborator

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left other comments offline.

* @param operatorId Operator identifier.
* @param portIdentity Optional port identifier. **Required** if `resourceType` is `RESULT` or `MATERIALIZED_RESULT`.
* @return A VFS URI
* @throws IllegalArgumentException if `resourceType` is `RESULT` but `portIdentity` is missing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you use multiple explicit APIs instead:
createPortResultURI(workflowId, eId, opId, portId)
createOperatorLogURI(workflowId, eid, opId)

case Some(existingUris) => Some(uri :: existingUris) // Prepend URI to the existing list
case None => Some(List(uri)) // Create a new list if key doesn't exist
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to remove some URI?

@@ -188,7 +202,7 @@ class ExecutionResultService(
2.seconds,
resultPullingFrequency.seconds
) {
onResultUpdate(physicalPlan)
onResultUpdate(executionId, physicalPlan)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Physical plan should have context already, which contains the Eid?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactoring Refactor the code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants