Skip to content

Commit

Permalink
Merge pull request #751 from cognitedata/main
Browse files Browse the repository at this point in the history
Release 0.97.2
  • Loading branch information
doctrino authored Nov 14, 2024
2 parents 5aca436 + 9cf41d3 commit f280fea
Show file tree
Hide file tree
Showing 19 changed files with 272 additions and 132 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up
version="0.97.1"
version="0.97.2"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
9 changes: 8 additions & 1 deletion cognite/neat/_graph/loaders/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class CDFLoader(BaseLoader[T_Output]):
def load_into_cdf(
self, client: CogniteClient, dry_run: bool = False, check_client: bool = True
) -> UploadResultList:
return UploadResultList(self.load_into_cdf_iterable(client, dry_run, check_client))
upload_result_by_name: dict[str, UploadResult] = {}
for upload_result in self.load_into_cdf_iterable(client, dry_run, check_client):
if last_result := upload_result_by_name.get(upload_result.name):
upload_result_by_name[upload_result.name] = last_result.merge(upload_result)
else:
upload_result_by_name[upload_result.name] = upload_result

return UploadResultList(upload_result_by_name.values())

def load_into_cdf_iterable(
self, client: CogniteClient, dry_run: bool = False, check_client: bool = True
Expand Down
50 changes: 35 additions & 15 deletions cognite/neat/_graph/loaders/_rdf2dms.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
from collections.abc import Iterable, Sequence
from pathlib import Path
Expand Down Expand Up @@ -295,9 +296,24 @@ def _upload_to_cdf(
dry_run: bool,
read_issues: IssueList,
) -> Iterable[UploadResult]:
nodes: list[dm.NodeApply] = []
edges: list[dm.EdgeApply] = []
source_by_node_id: dict[dm.NodeId, str] = {}
source_by_edge_id: dict[dm.EdgeId, str] = {}
for item in items:
if isinstance(item, dm.NodeApply):
nodes.append(item)
if item.sources:
source_by_node_id[item.as_id()] = item.sources[0].source.external_id
else:
source_by_node_id[item.as_id()] = "node"
elif isinstance(item, dm.EdgeApply):
edges.append(item)
if item.sources:
source_by_edge_id[item.as_id()] = item.sources[0].source.external_id
else:
source_by_edge_id[item.as_id()] = "edge"
try:
nodes = [item for item in items if isinstance(item, dm.NodeApply)]
edges = [item for item in items if isinstance(item, dm.EdgeApply)]
upserted = client.data_modeling.instances.apply(
nodes,
edges,
Expand All @@ -312,19 +328,23 @@ def _upload_to_cdf(
result.created.update(item.as_id() for item in e.successful)
yield result
else:
for instance_type, instances in {
"Nodes": upserted.nodes,
"Edges": upserted.edges,
}.items():
result = UploadResult[InstanceId](name=instance_type, issues=read_issues)
for instance in instances: # type: ignore[attr-defined]
if instance.was_modified and instance.created_time == instance.last_updated_time:
result.created.add(instance.as_id())
elif instance.was_modified:
result.changed.add(instance.as_id())
else:
result.unchanged.add(instance.as_id())
yield result
for instances, ids_by_source in [
(upserted.nodes, source_by_node_id),
(upserted.edges, source_by_edge_id),
]:
for name, subinstances in itertools.groupby(
sorted(instances, key=lambda i: ids_by_source[i.as_id()]), # type: ignore[call-overload, index, attr-defined]
key=lambda i: ids_by_source[i.as_id()], # type: ignore[index]
):
result = UploadResult(name=name, issues=read_issues)
for instance in subinstances: # type: ignore[attr-defined]
if instance.was_modified and instance.created_time == instance.last_updated_time:
result.created.add(instance.as_id())
elif instance.was_modified:
result.changed.add(instance.as_id())
else:
result.unchanged.add(instance.as_id())
yield result


def _get_field_value_types(cls, info):
Expand Down
4 changes: 4 additions & 0 deletions cognite/neat/_issues/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

from ._base import (
DefaultWarning,
FutureResult,
IssueList,
MultiValueError,
NeatError,
NeatIssue,
NeatIssueList,
NeatWarning,
catch_issues,
catch_warnings,
)

Expand All @@ -21,4 +23,6 @@
"IssueList",
"MultiValueError",
"catch_warnings",
"catch_issues",
"FutureResult",
]
48 changes: 48 additions & 0 deletions cognite/neat/_issues/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PropertyId,
ViewId,
)
from pydantic import ValidationError
from pydantic_core import ErrorDetails

from cognite.neat._utils.spreadsheet import SpreadsheetRead
Expand Down Expand Up @@ -398,6 +399,10 @@ def warnings(self) -> Self:
"""Return all the warnings in this list."""
return type(self)([issue for issue in self if isinstance(issue, NeatWarning)]) # type: ignore[misc]

def has_error_type(self, error_type: type[NeatError]) -> bool:
"""Return True if this list contains any errors of the given type."""
return any(isinstance(issue, error_type) for issue in self)

def as_errors(self, operation: str = "Operation failed") -> ExceptionGroup:
"""Return an ExceptionGroup with all the errors in this list."""
return ExceptionGroup(
Expand Down Expand Up @@ -473,3 +478,46 @@ def catch_warnings(
finally:
if warning_logger and issues is not None:
issues.extend([warning_cls.from_warning(warning) for warning in warning_logger]) # type: ignore[misc]


class FutureResult:
def __init__(self) -> None:
self._result: Literal["success", "failure", "pending"] = "pending"

@property
def result(self) -> Literal["success", "failure", "pending"]:
return self._result


@contextmanager
def catch_issues(
issues: IssueList,
error_cls: type[NeatError] = NeatError,
warning_cls: type[NeatWarning] = NeatWarning,
error_args: dict[str, Any] | None = None,
) -> Iterator[FutureResult]:
"""This is an internal help function to handle issues and warnings.
Args:
issues: The issues list to append to.
error_cls: The class used to convert errors to issues.
warning_cls: The class used to convert warnings to issues.
Returns:
FutureResult: A future result object that can be used to check the result of the context manager.
"""
with catch_warnings(issues, warning_cls):
future_result = FutureResult()
try:
yield future_result
except ValidationError as e:
issues.extend(error_cls.from_pydantic_errors(e.errors(), **(error_args or {})))
future_result._result = "failure"
except MultiValueError as e:
issues.extend(e.errors)
future_result._result = "failure"
except NeatError as e:
issues.append(e)
future_result._result = "failure"
else:
future_result._result = "success"
4 changes: 3 additions & 1 deletion cognite/neat/_issues/errors/_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ class NeatTypeError(NeatError, TypeError):

@dataclass(unsafe_hash=True)
class RegexViolationError(NeatError, ValueError):
"""Value, {value} failed regex, {regex}, validation. Make sure that the name follows the regex pattern."""
"""Value, {value} in {location} failed regex, {regex}, validation.
Make sure that the name follows the regex pattern."""

value: str
regex: str
location: str


@dataclass(unsafe_hash=True)
Expand Down
40 changes: 24 additions & 16 deletions cognite/neat/_rules/models/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,33 @@ def _validator(value: Any, next_: Any, ctx: ValidationInfo) -> Any:
PrefixType = Annotated[
str,
StringConstraints(pattern=PREFIX_COMPLIANCE_REGEX),
_custom_error(lambda _, value: RegexViolationError(value, PREFIX_COMPLIANCE_REGEX)),
_custom_error(
lambda _, value: RegexViolationError(value, PREFIX_COMPLIANCE_REGEX, "prefix entry in metadata"),
),
]

DataModelExternalIdType = Annotated[
str,
StringConstraints(pattern=DATA_MODEL_COMPLIANCE_REGEX),
_custom_error(lambda _, value: RegexViolationError(value, DATA_MODEL_COMPLIANCE_REGEX)),
_custom_error(
lambda _, value: RegexViolationError(value, DATA_MODEL_COMPLIANCE_REGEX, "external ID entry in metadata")
),
]


VersionType = Annotated[
str,
BeforeValidator(str),
StringConstraints(pattern=VERSION_COMPLIANCE_REGEX),
_custom_error(lambda _, value: RegexViolationError(value, VERSION_COMPLIANCE_REGEX)),
_custom_error(lambda _, value: RegexViolationError(value, VERSION_COMPLIANCE_REGEX, "version entry in metadata")),
]


def _external_id_validation_factory(entity_type: EntityTypes):
def _external_id_validation_factory(entity_type: EntityTypes, location: str):
def _external_id_validation(value: str) -> str:
compiled_regex = PATTERNS.entity_pattern(entity_type)
if not compiled_regex.match(value):
raise RegexViolationError(value, compiled_regex.pattern)
raise RegexViolationError(value, compiled_regex.pattern, location)
if PATTERNS.more_than_one_alphanumeric.search(value):
warnings.warn(
RegexViolationWarning(
Expand All @@ -116,36 +120,40 @@ def _external_id_validation(value: str) -> str:

SpaceType = Annotated[
str,
AfterValidator(_external_id_validation_factory(EntityTypes.space)),
AfterValidator(_external_id_validation_factory(EntityTypes.space, "space entry in metadata")),
]

InformationPropertyType = Annotated[
str,
AfterValidator(_external_id_validation_factory(EntityTypes.information_property)),
AfterValidator(_external_id_validation_factory(EntityTypes.information_property, "Property column in properties")),
]
DmsPropertyType = Annotated[
str,
AfterValidator(_external_id_validation_factory(EntityTypes.dms_property)),
AfterValidator(_external_id_validation_factory(EntityTypes.dms_property, "Property column in properties")),
]


def _entity_validation(value: Entities) -> Entities:
def _entity_validation(value: Entities, location: str) -> Entities:
suffix_regex = PATTERNS.entity_pattern(value.type_)
if not suffix_regex.match(value.suffix):
raise RegexViolationError(str(value), suffix_regex.pattern)
raise RegexViolationError(str(value), suffix_regex.pattern, location)
return value


ClassEntityType = Annotated[ClassEntity, AfterValidator(_entity_validation)]
ViewEntityType = Annotated[ViewEntity, AfterValidator(_entity_validation)]
ContainerEntityType = Annotated[ContainerEntity, AfterValidator(_entity_validation)]
ClassEntityType = Annotated[ClassEntity, AfterValidator(lambda v: _entity_validation(v, "the Class column"))]
ViewEntityType = Annotated[ViewEntity, AfterValidator(lambda v: _entity_validation(v, "the View column"))]
ContainerEntityType = Annotated[
ContainerEntity, AfterValidator(lambda v: _entity_validation(v, "the Container column"))
]


def _multi_value_type_validation(value: MultiValueTypeInfo) -> MultiValueTypeInfo:
def _multi_value_type_validation(value: MultiValueTypeInfo, location: str) -> MultiValueTypeInfo:
for type_ in value.types:
if isinstance(type_, ClassEntity):
_entity_validation(type_)
_entity_validation(type_, location)
return value


MultiValueTypeType = Annotated[MultiValueTypeInfo, AfterValidator(_multi_value_type_validation)]
MultiValueTypeType = Annotated[
MultiValueTypeInfo, AfterValidator(lambda v: _multi_value_type_validation(v, "the Value Type column"))
]
51 changes: 2 additions & 49 deletions cognite/neat/_rules/transformers/_verification.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from abc import ABC
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, Literal

from pydantic import ValidationError

from cognite.neat._issues import IssueList, MultiValueError, NeatError, NeatWarning, catch_warnings
from cognite.neat._issues import IssueList, NeatError, NeatWarning, catch_issues
from cognite.neat._issues.errors import NeatTypeError
from cognite.neat._rules._shared import (
InputRules,
Expand Down Expand Up @@ -45,7 +41,7 @@ def transform(self, rules: T_InputRules | OutRules[T_InputRules]) -> MaybeRules[
if isinstance(rules, ReadRules):
error_args = rules.read_context
verified_rules: T_VerifiedRules | None = None
with _catch_issues(issues, NeatError, NeatWarning, error_args) as future:
with catch_issues(issues, NeatError, NeatWarning, error_args) as future:
rules_cls = self._get_rules_cls(in_)
verified_rules = rules_cls.model_validate(in_.dump()) # type: ignore[assignment]

Expand Down Expand Up @@ -92,46 +88,3 @@ def _get_rules_cls(self, in_: InputRules) -> type[VerifiedRules]:
return DomainRules
else:
raise NeatTypeError(f"Unsupported rules type: {type(in_)}")


class _FutureResult:
def __init__(self) -> None:
self._result: Literal["success", "failure", "pending"] = "pending"

@property
def result(self) -> Literal["success", "failure", "pending"]:
return self._result


@contextmanager
def _catch_issues(
issues: IssueList,
error_cls: type[NeatError] = NeatError,
warning_cls: type[NeatWarning] = NeatWarning,
error_args: dict[str, Any] | None = None,
) -> Iterator[_FutureResult]:
"""This is an internal help function to handle issues and warnings.
Args:
issues: The issues list to append to.
error_cls: The class used to convert errors to issues.
warning_cls: The class used to convert warnings to issues.
Returns:
FutureResult: A future result object that can be used to check the result of the context manager.
"""
with catch_warnings(issues, warning_cls):
future_result = _FutureResult()
try:
yield future_result
except ValidationError as e:
issues.extend(error_cls.from_pydantic_errors(e.errors(), **(error_args or {})))
future_result._result = "failure"
except MultiValueError as e:
issues.extend(e.errors)
future_result._result = "failure"
except NeatError as e:
issues.append(e)
future_result._result = "failure"
else:
future_result._result = "success"
Loading

0 comments on commit f280fea

Please sign in to comment.