Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into neat-session-rules-pr…
Browse files Browse the repository at this point in the history
…ovenance
  • Loading branch information
doctrino committed Nov 10, 2024
2 parents 0e640d6 + a0f1f41 commit bba0050
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 68 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up
version="0.96.4"
version="0.96.6"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
1 change: 1 addition & 0 deletions cognite/neat/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def _is_in_browser() -> bool:
DataModelId("cdf_cdm", "CogniteCore", "v1"),
DataModelId("cdf_idm", "CogniteProcessIndustries", "v1"),
)
COGNITE_SPACES = frozenset(model.space for model in COGNITE_MODELS)
DMS_LISTABLE_PROPERTY_LIMIT = 1000

EXAMPLE_RULES = PACKAGE_DIRECTORY / "_rules" / "examples"
Expand Down
6 changes: 5 additions & 1 deletion cognite/neat/_issues/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def _get_variables(self) -> tuple[dict[str, str], bool]:
def dump(self) -> dict[str, Any]:
"""Return a dictionary representation of the issue."""
variables = vars(self)
output = {to_camel(key): self._dump_value(value) for key, value in variables.items() if value is not None}
output = {
to_camel(key): self._dump_value(value)
for key, value in variables.items()
if not (value is None or key.startswith("_"))
}
output["NeatIssue"] = type(self).__name__
return output

Expand Down
8 changes: 5 additions & 3 deletions cognite/neat/_rules/exporters/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ def _repr_html_(cls) -> str:
class CDFExporter(BaseExporter[T_VerifiedRules, T_Export]):
@abstractmethod
def export_to_cdf_iterable(
self, rules: T_VerifiedRules, client: CogniteClient, dry_run: bool = False
self, rules: T_VerifiedRules, client: CogniteClient, dry_run: bool = False, fallback_one_by_one: bool = False
) -> Iterable[UploadResult]:
raise NotImplementedError

def export_to_cdf(self, rules: T_VerifiedRules, client: CogniteClient, dry_run: bool = False) -> UploadResultList:
return UploadResultList(self.export_to_cdf_iterable(rules, client, dry_run))
def export_to_cdf(
self, rules: T_VerifiedRules, client: CogniteClient, dry_run: bool = False, fallback_one_by_one: bool = False
) -> UploadResultList:
return UploadResultList(self.export_to_cdf_iterable(rules, client, dry_run, fallback_one_by_one))
70 changes: 61 additions & 9 deletions cognite/neat/_rules/exporters/_rules2dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ def _create_exclude_set(self):
def export(self, rules: DMSRules) -> DMSSchema:
return rules.as_schema(include_pipeline=self.export_pipeline, instance_space=self.instance_space)

def delete_from_cdf(self, rules: DMSRules, client: CogniteClient, dry_run: bool = False) -> Iterable[UploadResult]:
def delete_from_cdf(
self, rules: DMSRules, client: CogniteClient, dry_run: bool = False, skip_space: bool = False
) -> Iterable[UploadResult]:
to_export = self._prepare_exporters(rules, client)

# we need to reverse order in which we are picking up the items to delete
# as they are sorted in the order of creation and we need to delete them in reverse order
for items, loader in reversed(to_export):
if skip_space and isinstance(loader, SpaceLoader):
continue
item_ids = loader.get_ids(items)
existing_items = loader.retrieve(item_ids)
existing_ids = loader.get_ids(existing_items)
Expand Down Expand Up @@ -162,10 +166,15 @@ def delete_from_cdf(self, rules: DMSRules, client: CogniteClient, dry_run: bool
)

def export_to_cdf_iterable(
self, rules: DMSRules, client: CogniteClient, dry_run: bool = False
self, rules: DMSRules, client: CogniteClient, dry_run: bool = False, fallback_one_by_one: bool = False
) -> Iterable[UploadResult]:
to_export = self._prepare_exporters(rules, client)

result_by_name = {}
if self.existing_handling == "force":
for delete_result in self.delete_from_cdf(rules, client, dry_run, skip_space=True):
result_by_name[delete_result.name] = delete_result

redeploy_data_model = False
for items, loader in to_export:
# The conversion from DMS to GraphQL does not seem to be triggered even if the views
Expand All @@ -183,8 +192,10 @@ def export_to_cdf_iterable(
created: set[Hashable] = set()
skipped: set[Hashable] = set()
changed: set[Hashable] = set()
deleted: set[Hashable] = set()
failed_created: set[Hashable] = set()
failed_changed: set[Hashable] = set()
failed_deleted: set[Hashable] = set()
error_messages: list[str] = []
if dry_run:
if self.existing_handling in ["update", "force"]:
Expand All @@ -200,42 +211,83 @@ def export_to_cdf_iterable(
try:
loader.delete(to_delete)
except CogniteAPIError as e:
error_messages.append(f"Failed delete: {e.message}")
if fallback_one_by_one:
for item in to_delete:
try:
loader.delete([item])
except CogniteAPIError as item_e:
failed_deleted.add(loader.get_id(item))
error_messages.append(f"Failed delete: {item_e!s}")
else:
deleted.add(loader.get_id(item))
else:
error_messages.append(f"Failed delete: {e!s}")
failed_deleted.update(loader.get_id(item) for item in e.failed + e.unknown)
else:
deleted.update(loader.get_id(item) for item in to_delete)

if isinstance(loader, DataModelingLoader):
to_create = loader.sort_by_dependencies(to_create)

try:
loader.create(to_create)
except CogniteAPIError as e:
failed_created.update(loader.get_id(item) for item in e.failed + e.unknown)
created.update(loader.get_id(item) for item in e.successful)
error_messages.append(e.message)
if fallback_one_by_one:
for item in to_create:
try:
loader.create([item])
except CogniteAPIError as item_e:
failed_created.add(loader.get_id(item))
error_messages.append(f"Failed create: {item_e!s}")
else:
created.add(loader.get_id(item))
else:
failed_created.update(loader.get_id(item) for item in e.failed + e.unknown)
created.update(loader.get_id(item) for item in e.successful)
error_messages.append(f"Failed create: {e!s}")
else:
created.update(loader.get_id(item) for item in to_create)

if self.existing_handling in ["update", "force"]:
try:
loader.update(to_update)
except CogniteAPIError as e:
failed_changed.update(loader.get_id(item) for item in e.failed + e.unknown)
changed.update(loader.get_id(item) for item in e.successful)
error_messages.append(e.message)
if fallback_one_by_one:
for item in to_update:
try:
loader.update([item])
except CogniteAPIError as e_item:
failed_changed.add(loader.get_id(item))
error_messages.append(f"Failed update: {e_item!s}")
else:
changed.add(loader.get_id(item))
else:
failed_changed.update(loader.get_id(item) for item in e.failed + e.unknown)
changed.update(loader.get_id(item) for item in e.successful)
error_messages.append(f"Failed update: {e!s}")
else:
changed.update(loader.get_id(item) for item in to_update)
elif self.existing_handling == "skip":
skipped.update(loader.get_id(item) for item in to_update)
elif self.existing_handling == "fail":
failed_changed.update(loader.get_id(item) for item in to_update)

if loader.resource_name in result_by_name:
delete_result = result_by_name[loader.resource_name]
deleted.update(delete_result.deleted)
failed_deleted.update(delete_result.failed_deleted)
error_messages.extend(delete_result.error_messages)

yield UploadResult(
name=loader.resource_name,
created=created,
changed=changed,
deleted=deleted,
unchanged={loader.get_id(item) for item in unchanged},
skipped=skipped,
failed_created=failed_created,
failed_changed=failed_changed,
failed_deleted=failed_deleted,
error_messages=error_messages,
issues=issue_list,
)
Expand Down
6 changes: 5 additions & 1 deletion cognite/neat/_rules/importers/_spreadsheet2rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
("Classes", "Classes", "Class"),
("Containers", "Containers", "Container"),
("Views", "Views", "View"),
("Enum", "Enum", "Collection"),
("Nodes", "Nodes", "Node"),
]

MANDATORY_SHEETS_BY_ROLE: dict[RoleTypes, set[str]] = {
Expand Down Expand Up @@ -112,7 +114,7 @@ def __init__(
issue_list: IssueList,
required: bool = True,
metadata: MetadataRaw | None = None,
sheet_prefix: Literal["", "Last", "Ref"] = "",
sheet_prefix: Literal["", "Last", "Ref", "CDMRef"] = "",
):
self.issue_list = issue_list
self.required = required
Expand Down Expand Up @@ -267,6 +269,8 @@ def to_rules(self) -> ReadRules[T_InputRules]:
reference_read: ReadResult | None = None
if any(sheet_name.startswith("Ref") for sheet_name in user_reader.seen_sheets):
reference_read = SpreadsheetReader(issue_list, sheet_prefix="Ref").read(excel_file, self.filepath)
elif any(sheet_name.startswith("CDMRef") for sheet_name in user_reader.seen_sheets):
reference_read = SpreadsheetReader(issue_list, sheet_prefix="CDMRef").read(excel_file, self.filepath)

if issue_list.has_errors:
return ReadRules(None, issue_list, {})
Expand Down
19 changes: 16 additions & 3 deletions cognite/neat/_rules/models/dms/_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,30 @@ def _gather_properties(

return container_properties_by_id, view_properties_by_id

@staticmethod
def _gather_properties_with_ancestors(
self,
view_properties_by_id: dict[dm.ViewId, list[DMSProperty]],
views: Sequence[DMSView],
) -> dict[dm.ViewId, list[DMSProperty]]:
all_view_properties_by_id = view_properties_by_id.copy()
if self.rules.reference:
# We need to include t
ref_view_properties_by_id = self._gather_properties(self.rules.reference.properties)[1]
for view_id, properties in ref_view_properties_by_id.items():
if view_id not in all_view_properties_by_id:
all_view_properties_by_id[view_id] = properties
else:
existing_properties = {prop._identifier() for prop in all_view_properties_by_id[view_id]}
for prop in properties:
if prop._identifier() not in existing_properties:
all_view_properties_by_id[view_id].append(prop)

view_properties_with_parents_by_id: dict[dm.ViewId, list[DMSProperty]] = defaultdict(list)
view_by_view_id = {view.view.as_id(): view for view in views}
for view in views:
view_id = view.view.as_id()
seen: set[Hashable] = set()
if view_properties := view_properties_by_id.get(view_id):
if view_properties := all_view_properties_by_id.get(view_id):
view_properties_with_parents_by_id[view_id].extend(view_properties)
seen.update(prop._identifier() for prop in view_properties)
if not view.implements:
Expand All @@ -428,7 +441,7 @@ def _gather_properties_with_ancestors(
parents.append(grandparent)
seen_parents.add(grandparent)

if not (parent_view_properties := view_properties_by_id.get(parent_view_id)):
if not (parent_view_properties := all_view_properties_by_id.get(parent_view_id)):
continue
for prop in parent_view_properties:
new_prop = prop.model_copy(update={"view": view.view})
Expand Down
43 changes: 30 additions & 13 deletions cognite/neat/_rules/models/dms/_rules.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import math
import sys
import warnings
from collections.abc import Hashable
from datetime import datetime
from typing import TYPE_CHECKING, Any, ClassVar, Literal
from typing import Any, ClassVar, Literal

import pandas as pd
from cognite.client import data_modeling as dm
from pydantic import Field, field_serializer, field_validator, model_validator
from pydantic_core.core_schema import SerializationInfo, ValidationInfo
from rdflib import URIRef

from cognite.neat._constants import DEFAULT_NAMESPACE
from cognite.neat._constants import COGNITE_SPACES, DEFAULT_NAMESPACE
from cognite.neat._issues import MultiValueError
from cognite.neat._issues.warnings import (
PrincipleMatchingSpaceAndVersionWarning,
Expand Down Expand Up @@ -58,14 +57,6 @@

from ._schema import DMSSchema

if TYPE_CHECKING:
pass

if sys.version_info >= (3, 11):
pass
else:
pass

_DEFAULT_VERSION = "1"


Expand Down Expand Up @@ -199,6 +190,24 @@ def connections_value_type(
raise ValueError(f"Reverse connection must have a value type that points to a view, got {value}")
return value

@field_validator("container", "container_property", mode="after")
def container_set_correctly(cls, value: Any, info: ValidationInfo) -> Any:
if (connection := info.data.get("connection")) is None:
return value
if connection == "direct" and value is None:
raise ValueError(
"You must provide a container and container property for where to store direct connections"
)
elif isinstance(connection, EdgeEntity) and value is not None:
raise ValueError(
"Edge connections are not stored in a container, please remove the container and container property"
)
elif isinstance(connection, ReverseConnectionEntity) and value is not None:
raise ValueError(
"Reverse connection are not stored in a container, please remove the container and container property"
)
return value

@field_serializer("reference", when_used="always")
def set_reference(self, value: Any, info: SerializationInfo) -> str | None:
if isinstance(info.context, dict) and info.context.get("as_reference") is True:
Expand Down Expand Up @@ -423,15 +432,23 @@ def matching_version_and_space(cls, value: SheetList[DMSView], info: ValidationI
if not (metadata := info.data.get("metadata")):
return value
model_version = metadata.version
if different_version := [view.view.as_id() for view in value if view.view.version != model_version]:
if different_version := [
view.view.as_id()
for view in value
if view.view.version != model_version and view.view.space not in COGNITE_SPACES
]:
for view_id in different_version:
warnings.warn(
PrincipleMatchingSpaceAndVersionWarning(
f"The view {view_id!r} has a different version than the data model version, {model_version}",
),
stacklevel=2,
)
if different_space := [view.view.as_id() for view in value if view.view.space != metadata.space]:
if different_space := [
view.view.as_id()
for view in value
if view.view.space != metadata.space and view.view.space not in COGNITE_SPACES
]:
for view_id in different_space:
warnings.warn(
PrincipleMatchingSpaceAndVersionWarning(
Expand Down
12 changes: 10 additions & 2 deletions cognite/neat/_session/_to.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ def instances(self, space: str | None = None):

return loader.load_into_cdf(self._client)

def data_model(self, existing_handling: Literal["fail", "skip", "update", "force"] = "skip", dry_run: bool = False):
def data_model(
self,
existing_handling: Literal["fail", "skip", "update", "force"] = "skip",
dry_run: bool = False,
fallback_one_by_one: bool = False,
):
"""Export the verified DMS data model to CDF.
Args:
existing_handling: How to handle if component of data model exists. Defaults to "skip".
dry_run: If True, no changes will be made to CDF. Defaults to False.
fallback_one_by_one: If True, will fall back to one-by-one upload if batch upload fails. Defaults to False.
... note::
Expand All @@ -85,7 +91,9 @@ def data_model(self, existing_handling: Literal["fail", "skip", "update", "force

conversion_issues = IssueList(action="to.cdf.data_model")
with catch_warnings(conversion_issues):
result = exporter.export_to_cdf(self._state.data_model.last_verified_dms_rules[1], self._client, dry_run)
result = exporter.export_to_cdf(
self._state.data_model.last_verified_dms_rules[1], self._client, dry_run, fallback_one_by_one
)
result.insert(0, UploadResultCore(name="schema", issues=conversion_issues))
self._state.data_model.outcome.append(result)
print("You can inspect the details with the .inspect.outcome(...) method.")
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/_utils/cdf/loaders/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, client: CogniteClient) -> None:

@classmethod
@abstractmethod
def get_id(cls, item: T_WriteClass | T_WritableCogniteResource) -> T_ID:
def get_id(cls, item: T_WriteClass | T_WritableCogniteResource | dict | T_ID) -> T_ID:
raise NotImplementedError

@classmethod
Expand Down
Loading

0 comments on commit bba0050

Please sign in to comment.