diff --git a/CHANGELOG.md b/CHANGELOG.md index bede2f68a..37fc6dafb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,8 +166,10 @@ and will be removed for release. - Added type hints. - [PR #2057](https://github.com/RDFLib/rdflib/pull/2057). - `rdflib.store` and builtin stores have mostly complete type hints. + [PR #2057](https://github.com/RDFLib/rdflib/pull/2057). + - `rdflib.graph` have mostly complete type hints. + [PR #2080](https://github.com/RDFLib/rdflib/pull/2080). diff --git a/devtools/diffrtpy.py b/devtools/diffrtpy.py index 3c2c766f1..d8873aa5e 100755 --- a/devtools/diffrtpy.py +++ b/devtools/diffrtpy.py @@ -32,8 +32,16 @@ from strip_hints import strip_string_to_string -def clean_python(code: str) -> str: - code = strip_string_to_string(code, to_empty=True, strip_nl=True) +def clean_python(input: Path) -> str: + code = input.read_text() + try: + code = strip_string_to_string(code, to_empty=True, strip_nl=True) + except Exception: + logging.warning( + "failed to strip type hints from %s, falling back to using with type hints", + input, + ) + code = code code = python_minifier.minify( code, remove_annotations=True, @@ -106,12 +114,12 @@ def handle(self, parse_result: argparse.Namespace) -> None: "base = %s, lhs_file = %s, rhs_file = %s", base, lhs_file, rhs_file ) - lhs_file_content = lhs_file.read_text() - rhs_file_content = rhs_file.read_text() - if lhs_file.name.endswith(".py") and rhs_file.name.endswith(".py"): - lhs_file_content = clean_python(lhs_file_content) - rhs_file_content = clean_python(rhs_file_content) + lhs_file_content = clean_python(lhs_file) + rhs_file_content = clean_python(rhs_file) + else: + lhs_file_content = lhs_file.read_text() + rhs_file_content = rhs_file.read_text() lhs_file_lines = lhs_file_content.splitlines(keepends=True) rhs_file_lines = rhs_file_content.splitlines(keepends=True) diff --git a/docs/conf.py b/docs/conf.py index 3c9965a25..802e8cb4f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -264,6 +264,7 @@ def find_version(filename): ("py:class", "importlib.metadata.EntryPoint"), ("py:class", "xml.dom.minidom.Document"), ("py:class", "xml.dom.minidom.DocumentFragment"), + ("py:class", "isodate.duration.Duration"), # sphinx-autodoc-typehints has some issues with TypeVars. # https://github.com/tox-dev/sphinx-autodoc-typehints/issues/39 ("py:class", "rdflib.plugin.PluginT"), @@ -282,13 +283,23 @@ def find_version(filename): if sys.version_info < (3, 9): nitpick_ignore.extend( [ - ("py:class", "_TriplePatternType"), - ("py:class", "_TripleType"), + ("py:class", "_ContextIdentifierType"), + ("py:class", "_ContextType"), + ("py:class", "_GraphT"), + ("py:class", "_NamespaceSetString"), ("py:class", "_ObjectType"), ("py:class", "_PredicateType"), + ("py:class", "_QuadSelectorType"), ("py:class", "_SubjectType"), - ("py:class", "_ContextType"), - ("py:class", "_ContextIdentifierType"), + ("py:class", "_TripleOrPathTripleType"), + ("py:class", "_TripleOrQuadPathPatternType"), + ("py:class", "_TripleOrQuadPatternType"), + ("py:class", "_TriplePathPatternType"), + ("py:class", "_TriplePathType"), + ("py:class", "_TriplePatternType"), + ("py:class", "_TripleSelectorType"), + ("py:class", "_TripleType"), + ("py:class", "_TripleOrTriplePathType"), ("py:class", "TextIO"), ] ) diff --git a/rdflib/graph.py b/rdflib/graph.py index 7dc69dad0..586592577 100644 --- a/rdflib/graph.py +++ b/rdflib/graph.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import pathlib import random @@ -7,9 +9,15 @@ TYPE_CHECKING, Any, BinaryIO, + Callable, + Dict, Generator, Iterable, + List, + Mapping, + NoReturn, Optional, + Set, TextIO, Tuple, Type, @@ -34,7 +42,23 @@ from rdflib.resource import Resource from rdflib.serializer import Serializer from rdflib.store import Store -from rdflib.term import BNode, Genid, IdentifiedNode, Literal, Node, RDFLibGenid, URIRef +from rdflib.term import ( + BNode, + Genid, + IdentifiedNode, + Identifier, + Literal, + Node, + RDFLibGenid, + URIRef, + Variable, +) + +if TYPE_CHECKING: + import typing_extensions as te + + import rdflib.query + from rdflib.plugins.sparql.sparql import Query, Update _SubjectType = Node _PredicateType = Node @@ -46,19 +70,48 @@ _OptionalQuadType = Tuple[ "_SubjectType", "_PredicateType", "_ObjectType", Optional["_ContextType"] ] +_TripleOrOptionalQuadType = Union["_TripleType", "_OptionalQuadType"] _OptionalIdentifiedQuadType = Tuple[ "_SubjectType", "_PredicateType", "_ObjectType", Optional["_ContextIdentifierType"] ] _TriplePatternType = Tuple[ Optional["_SubjectType"], Optional["_PredicateType"], Optional["_ObjectType"] ] +_TriplePathPatternType = Tuple[Optional["_SubjectType"], Path, Optional["_ObjectType"]] _QuadPatternType = Tuple[ Optional["_SubjectType"], Optional["_PredicateType"], Optional["_ObjectType"], - Optional["Graph"], + Optional["_ContextType"], +] +_QuadPathPatternType = Tuple[ + Optional["_SubjectType"], + Path, + Optional["_ObjectType"], + Optional["_ContextType"], +] +_TripleOrQuadPatternType = Union["_TriplePatternType", "_QuadPatternType"] +_TripleOrQuadPathPatternType = Union["_TriplePathPatternType", "_QuadPathPatternType"] +_TripleSelectorType = Tuple[ + Optional["_SubjectType"], + Optional[Union["Path", "_PredicateType"]], + Optional["_ObjectType"], +] +_QuadSelectorType = Tuple[ + Optional["_SubjectType"], + Optional[Union["Path", "_PredicateType"]], + Optional["_ObjectType"], + Optional["_ContextType"], ] +_TripleOrQuadSelectorType = Union["_TripleSelectorType", "_QuadSelectorType"] +_TriplePathType = Tuple["_SubjectType", Path, "_ObjectType"] +_TripleOrTriplePathType = Union["_TripleType", "_TriplePathType"] +# _QuadPathType = Tuple["_SubjectType", Path, "_ObjectType", "_ContextType"] +# _QuadOrQuadPathType = Union["_QuadType", "_QuadPathType"] + _GraphT = TypeVar("_GraphT", bound="Graph") +_ConjunctiveGraphT = TypeVar("_ConjunctiveGraphT", bound="ConjunctiveGraph") +_DatasetT = TypeVar("_DatasetT", bound="Dataset") assert Literal # avoid warning assert Namespace # avoid warning @@ -325,17 +378,34 @@ "UnSupportedAggregateOperation", "ReadOnlyGraphAggregate", "BatchAddGraph", - "_TriplePatternType", - "_TripleType", - "_SubjectType", + "_ConjunctiveGraphT", + "_ContextIdentifierType", + "_DatasetT", + "_GraphT", "_ObjectType", - "_PredicateType", - "_QuadPatternType", "_OptionalIdentifiedQuadType", "_OptionalQuadType", + "_PredicateType", + "_QuadPathPatternType", + "_QuadPatternType", + "_QuadSelectorType", "_QuadType", + "_SubjectType", + "_TripleOrOptionalQuadType", + "_TripleOrTriplePathType", + "_TripleOrQuadPathPatternType", + "_TripleOrQuadPatternType", + "_TripleOrQuadSelectorType", + "_TriplePathPatternType", + "_TriplePathType", + "_TriplePatternType", + "_TripleSelectorType", + "_TripleType", ] +# : Transitive closure arg type. +_TCArgT = TypeVar("_TCArgT") + class Graph(Node): """An RDF Graph @@ -363,7 +433,7 @@ class Graph(Node): def __init__( self, store: Union[Store, str] = "default", - identifier: Optional[Union[IdentifiedNode, str]] = None, + identifier: Optional[Union[_ContextIdentifierType, str]] = None, namespace_manager: Optional[NamespaceManager] = None, base: Optional[str] = None, bind_namespaces: "_NamespaceSetString" = "core", @@ -404,13 +474,13 @@ def namespace_manager(self) -> NamespaceManager: return self.__namespace_manager @namespace_manager.setter - def namespace_manager(self, nm: NamespaceManager): + def namespace_manager(self, nm: NamespaceManager) -> None: self.__namespace_manager = nm - def __repr__(self): + def __repr__(self) -> str: return "" % (self.identifier, type(self)) - def __str__(self): + def __str__(self) -> str: if isinstance(self.identifier, URIRef): return ( "%s a rdfg:Graph;rdflib:storage " + "[a rdflib:Store;rdfs:label '%s']." @@ -420,26 +490,26 @@ def __str__(self): "[a rdfg:Graph;rdflib:storage " + "[a rdflib:Store;rdfs:label '%s']]." ) % self.store.__class__.__name__ - def toPython(self): # noqa: N802 + def toPython(self: _GraphT) -> _GraphT: # noqa: N802 return self - def destroy(self, configuration): + def destroy(self: _GraphT, configuration: str) -> _GraphT: """Destroy the store identified by ``configuration`` if supported""" self.__store.destroy(configuration) return self # Transactional interfaces (optional) - def commit(self): + def commit(self: _GraphT) -> _GraphT: """Commits active transactions""" self.__store.commit() return self - def rollback(self): + def rollback(self: _GraphT) -> _GraphT: """Rollback active transactions""" self.__store.rollback() return self - def open(self, configuration, create=False): + def open(self, configuration: str, create: bool = False) -> Optional[int]: """Open the graph store Might be necessary for stores that require opening a connection to a @@ -447,7 +517,7 @@ def open(self, configuration, create=False): """ return self.__store.open(configuration, create) - def close(self, commit_pending_transaction=False): + def close(self, commit_pending_transaction: bool = False) -> None: """Close the graph store Might be necessary for stores that require closing a connection to a @@ -455,7 +525,7 @@ def close(self, commit_pending_transaction=False): """ return self.__store.close(commit_pending_transaction=commit_pending_transaction) - def add(self, triple: "_TripleType"): + def add(self: _GraphT, triple: "_TripleType") -> _GraphT: """Add a triple with self as context""" s, p, o = triple assert isinstance(s, Node), "Subject %s must be an rdflib term" % (s,) @@ -464,7 +534,7 @@ def add(self, triple: "_TripleType"): self.__store.add((s, p, o), self, quoted=False) return self - def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 + def addN(self: _GraphT, quads: Iterable["_QuadType"]) -> _GraphT: # noqa: N802 """Add a sequence of triple with context""" self.__store.addN( @@ -476,7 +546,7 @@ def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 ) return self - def remove(self, triple): + def remove(self: _GraphT, triple: "_TriplePatternType") -> _GraphT: """Remove a triple from the graph If the triple does not provide a context attribute, removes the triple @@ -495,33 +565,21 @@ def triples( @overload def triples( self, - triple: Tuple[Optional["_SubjectType"], Path, Optional["_ObjectType"]], - ) -> Generator[Tuple["_SubjectType", Path, "_ObjectType"], None, None]: + triple: "_TriplePathPatternType", + ) -> Generator["_TriplePathType", None, None]: ... @overload def triples( self, - triple: Tuple[ - Optional["_SubjectType"], - Union[None, Path, "_PredicateType"], - Optional["_ObjectType"], - ], - ) -> Generator[ - Tuple["_SubjectType", Union["_PredicateType", Path], "_ObjectType"], None, None - ]: + triple: "_TripleSelectorType", + ) -> Generator["_TripleOrTriplePathType", None, None]: ... def triples( self, - triple: Tuple[ - Optional["_SubjectType"], - Union[None, Path, "_PredicateType"], - Optional["_ObjectType"], - ], - ) -> Generator[ - Tuple["_SubjectType", Union["_PredicateType", Path], "_ObjectType"], None, None - ]: + triple: "_TripleSelectorType", + ) -> Generator["_TripleOrTriplePathType", None, None]: """Generator over the triple store Returns triples that match the given triple pattern. If triple pattern @@ -607,28 +665,29 @@ def __getitem__(self, item): "You can only index a graph by a single rdflib term or path, or a slice of rdflib terms." ) - def __len__(self): + def __len__(self) -> int: """Returns the number of triples in the graph If context is specified then the number of triples in the context is returned instead. """ - return self.__store.__len__(context=self) + # type error: Unexpected keyword argument "context" for "__len__" of "Store" + return self.__store.__len__(context=self) # type: ignore[call-arg] def __iter__(self) -> Generator["_TripleType", None, None]: """Iterates over all triples in the store""" return self.triples((None, None, None)) - def __contains__(self, triple): + def __contains__(self, triple: _TriplePatternType) -> bool: """Support for 'triple in graph' syntax""" for triple in self.triples(triple): return True return False - def __hash__(self): + def __hash__(self) -> int: return hash(self.identifier) - def __cmp__(self, other): + def __cmp__(self, other) -> int: if other is None: return -1 elif isinstance(other, Graph): @@ -641,23 +700,23 @@ def __cmp__(self, other): # equivalent to None (if compared to it)? return 1 - def __eq__(self, other): + def __eq__(self, other) -> bool: return isinstance(other, Graph) and self.identifier == other.identifier - def __lt__(self, other): + def __lt__(self, other) -> bool: return (other is None) or ( isinstance(other, Graph) and self.identifier < other.identifier ) - def __le__(self, other): + def __le__(self, other: Graph) -> bool: return self < other or self == other - def __gt__(self, other): + def __gt__(self, other) -> bool: return (isinstance(other, Graph) and self.identifier > other.identifier) or ( other is not None ) - def __ge__(self, other): + def __ge__(self, other: Graph) -> bool: return self > other or self == other def __iadd__(self: "_GraphT", other: Iterable["_TripleType"]) -> "_GraphT": @@ -712,7 +771,7 @@ def __sub__(self, other: "Graph") -> "Graph": retval.add(x) return retval - def __xor__(self, other): + def __xor__(self, other: "Graph") -> "Graph": """Set-theoretic XOR. BNode IDs are not changed.""" return (self - other) + (other - self) @@ -722,7 +781,9 @@ def __xor__(self, other): # Conv. methods - def set(self, triple): + def set( + self: _GraphT, triple: Tuple[_SubjectType, _PredicateType, _ObjectType] + ) -> _GraphT: """Convenience method to update the value of object Remove any existing triples for subject and predicate before adding @@ -876,16 +937,75 @@ def predicate_objects( ) raise - def triples_choices(self, triple, context=None): + def triples_choices( + self, + triple: Union[ + Tuple[List["_SubjectType"], "_PredicateType", "_ObjectType"], + Tuple["_SubjectType", List["_PredicateType"], "_ObjectType"], + Tuple["_SubjectType", "_PredicateType", List["_ObjectType"]], + ], + context: Optional["_ContextType"] = None, + ) -> Generator[_TripleType, None, None]: subject, predicate, object_ = triple + # type error: Argument 1 to "triples_choices" of "Store" has incompatible type "Tuple[Union[List[Node], Node], Union[Node, List[Node]], Union[Node, List[Node]]]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" + # type error note: unpacking discards type info for (s, p, o), cg in self.store.triples_choices( - (subject, predicate, object_), context=self + (subject, predicate, object_), context=self # type: ignore[arg-type] ): yield s, p, o + @overload def value( - self, subject=None, predicate=RDF.value, object=None, default=None, any=True - ): + self, + subject: None = ..., + predicate: None = ..., + object: Optional[_ObjectType] = ..., + default: Optional[Node] = ..., + any: bool = ..., + ) -> None: + ... + + @overload + def value( + self, + subject: Optional[_SubjectType] = ..., + predicate: None = ..., + object: None = ..., + default: Optional[Node] = ..., + any: bool = ..., + ) -> None: + ... + + @overload + def value( + self, + subject: None = ..., + predicate: Optional[_PredicateType] = ..., + object: None = ..., + default: Optional[Node] = ..., + any: bool = ..., + ) -> None: + ... + + @overload + def value( + self, + subject: Optional[_SubjectType] = ..., + predicate: Optional[_PredicateType] = ..., + object: Optional[_ObjectType] = ..., + default: Optional[Node] = ..., + any: bool = ..., + ) -> Optional[Node]: + ... + + def value( + self, + subject: Optional[_SubjectType] = None, + predicate: Optional[_PredicateType] = RDF.value, + object: Optional[_ObjectType] = None, + default: Optional[Node] = None, + any: bool = True, + ) -> Optional[Node]: """Get a value for a pair of two criteria Exactly one of subject, predicate, object must be None. Useful if one @@ -942,7 +1062,7 @@ def value( pass return retval - def items(self, list): + def items(self, list: Node) -> Generator[Node, None, None]: """Generator over all items in the resource specified by list list is an RDF collection. @@ -952,12 +1072,18 @@ def items(self, list): item = self.value(list, RDF.first) if item is not None: yield item - list = self.value(list, RDF.rest) + # type error: Incompatible types in assignment (expression has type "Optional[Node]", variable has type "Node") + list = self.value(list, RDF.rest) # type: ignore[assignment] if list in chain: raise ValueError("List contains a recursive rdf:rest reference") chain.add(list) - def transitiveClosure(self, func, arg, seen=None): # noqa: N802 + def transitiveClosure( # noqa: N802 + self, + func: Callable[[_TCArgT, "Graph"], Iterable[_TCArgT]], + arg: _TCArgT, + seen: Optional[Dict[_TCArgT, int]] = None, + ): # noqa: N802 """ Generates transitive closure of a user-defined function against the graph @@ -1014,7 +1140,12 @@ def transitiveClosure(self, func, arg, seen=None): # noqa: N802 for rt_2 in self.transitiveClosure(func, rt, seen): yield rt_2 - def transitive_objects(self, subject, predicate, remember=None): + def transitive_objects( + self, + subject: Optional[_SubjectType], + predicate: Optional[_PredicateType], + remember: Optional[Dict[Optional[_SubjectType], int]] = None, + ) -> Generator[Optional[_SubjectType], None, None]: """Transitively generate objects for the ``predicate`` relationship Generated objects belong to the depth first transitive closure of the @@ -1030,7 +1161,12 @@ def transitive_objects(self, subject, predicate, remember=None): for o in self.transitive_objects(object, predicate, remember): yield o - def transitive_subjects(self, predicate, object, remember=None): + def transitive_subjects( + self, + predicate: Optional[_PredicateType], + object: Optional[_ObjectType], + remember: Optional[Dict[Optional[_ObjectType], int]] = None, + ) -> Generator[Optional[_ObjectType], None, None]: """Transitively generate subjects for the ``predicate`` relationship Generated subjects belong to the depth first transitive closure of the @@ -1046,13 +1182,19 @@ def transitive_subjects(self, predicate, object, remember=None): for s in self.transitive_subjects(predicate, subject, remember): yield s - def qname(self, uri): + def qname(self, uri: str) -> str: return self.namespace_manager.qname(uri) - def compute_qname(self, uri, generate=True): + def compute_qname(self, uri: str, generate: bool = True) -> Tuple[str, URIRef, str]: return self.namespace_manager.compute_qname(uri, generate) - def bind(self, prefix, namespace, override=True, replace=False) -> None: + def bind( + self, + prefix: Optional[str], + namespace: Any, # noqa: F811 + override: bool = True, + replace: bool = False, + ) -> None: """Bind prefix to namespace If override is True will bind namespace to given prefix even @@ -1074,19 +1216,24 @@ def bind(self, prefix, namespace, override=True, replace=False) -> None: prefix, namespace, override=override, replace=replace ) - def namespaces(self): + def namespaces(self) -> Generator[Tuple[str, URIRef], None, None]: """Generator over all the prefix, namespace tuples""" - for prefix, namespace in self.namespace_manager.namespaces(): + for prefix, namespace in self.namespace_manager.namespaces(): # noqa: F402 yield prefix, namespace - def absolutize(self, uri, defrag=1): + def absolutize(self, uri: str, defrag: int = 1) -> URIRef: """Turn uri into an absolute URI if it's not one already""" return self.namespace_manager.absolutize(uri, defrag) # no destination and non-None positional encoding @overload def serialize( - self, destination: None, format: str, base: Optional[str], encoding: str, **args + self, + destination: None, + format: str, + base: Optional[str], + encoding: str, + **args: Any, ) -> bytes: ... @@ -1099,7 +1246,7 @@ def serialize( base: Optional[str] = ..., *, encoding: str, - **args, + **args: Any, ) -> bytes: ... @@ -1111,7 +1258,7 @@ def serialize( format: str = ..., base: Optional[str] = ..., encoding: None = ..., - **args, + **args: Any, ) -> str: ... @@ -1123,7 +1270,7 @@ def serialize( format: str = ..., base: Optional[str] = ..., encoding: Optional[str] = ..., - **args, + **args: Any, ) -> "Graph": ... @@ -1135,18 +1282,18 @@ def serialize( format: str = ..., base: Optional[str] = ..., encoding: Optional[str] = ..., - **args, + **args: Any, ) -> Union[bytes, str, "Graph"]: ... def serialize( - self, + self: _GraphT, destination: Optional[Union[str, pathlib.PurePath, IO[bytes]]] = None, format: str = "turtle", base: Optional[str] = None, encoding: Optional[str] = None, **args: Any, - ) -> Union[bytes, str, "Graph"]: + ) -> Union[bytes, str, _GraphT]: """ Serialize the graph. @@ -1214,7 +1361,12 @@ def serialize( serializer.serialize(stream, encoding=encoding, **args) return self - def print(self, format="turtle", encoding="utf-8", out=None): + def print( + self, + format: str = "turtle", + encoding: str = "utf-8", + out: Optional[TextIO] = None, + ) -> None: print( self.serialize(None, format=format, encoding=encoding).decode(encoding), file=out, @@ -1231,8 +1383,8 @@ def parse( location: Optional[str] = None, file: Optional[Union[BinaryIO, TextIO]] = None, data: Optional[Union[str, bytes]] = None, - **args, - ): + **args: Any, + ) -> "Graph": """ Parse an RDF source adding the resulting triples to the Graph. @@ -1345,13 +1497,13 @@ def parse( def query( self, - query_object, + query_object: Union[str, Query], processor: Union[str, query.Processor] = "sparql", result: Union[str, Type[query.Result]] = "sparql", - initNs=None, # noqa: N803 - initBindings=None, + initNs: Optional[Mapping[str, Any]] = None, # noqa: N803 + initBindings: Optional[Mapping[Variable, Identifier]] = None, use_store_provided: bool = True, - **kwargs, + **kwargs: Any, ) -> query.Result: """ Query this graph. @@ -1376,8 +1528,7 @@ def query( query_object, initNs, initBindings, - # type error: Argument 4 to "query" of "Store" has incompatible type "Union[Literal['__UNION__'], Node]"; expected "Identifier" - self.default_union and "__UNION__" or self.identifier, # type: ignore[arg-type] + self.default_union and "__UNION__" or self.identifier, **kwargs, ) except NotImplementedError: @@ -1392,13 +1543,13 @@ def query( def update( self, - update_object, - processor="sparql", - initNs=None, # noqa: N803 - initBindings=None, - use_store_provided=True, - **kwargs, - ): + update_object: Union[Update, str], + processor: Union[str, rdflib.query.UpdateProcessor] = "sparql", + initNs: Optional[Mapping[str, Any]] = None, # noqa: N803 + initBindings: Optional[Mapping[Variable, Identifier]] = None, + use_store_provided: bool = True, + **kwargs: Any, + ) -> None: """Update this graph with the given update query.""" initBindings = initBindings or {} # noqa: N806 initNs = initNs or dict(self.namespaces()) # noqa: N806 @@ -1420,11 +1571,12 @@ def update( return processor.update(update_object, initBindings, initNs, **kwargs) - def n3(self): + def n3(self) -> str: """Return an n3 identifier for the Graph""" - return "[%s]" % self.identifier.n3() + # type error: "IdentifiedNode" has no attribute "n3" + return "[%s]" % self.identifier.n3() # type: ignore[attr-defined] - def __reduce__(self): + def __reduce__(self) -> Tuple[Type[Graph], Tuple[Store, _ContextIdentifierType]]: return ( Graph, ( @@ -1433,7 +1585,7 @@ def __reduce__(self): ), ) - def isomorphic(self, other): + def isomorphic(self, other: Graph) -> bool: """ does a very basic check if these graphs are the same If no BNodes are involved, this is accurate. @@ -1454,7 +1606,7 @@ def isomorphic(self, other): # TODO: very well could be a false positive at this point yet. return True - def connected(self): + def connected(self) -> bool: """Check if the Graph is connected The Graph is considered undirectional. @@ -1491,12 +1643,12 @@ def connected(self): else: return False - def all_nodes(self): + def all_nodes(self) -> Set[Node]: res = set(self.objects()) res.update(self.subjects()) return res - def collection(self, identifier): + def collection(self, identifier: _SubjectType) -> Collection: """Create a new ``Collection`` instance. Parameters: @@ -1516,7 +1668,7 @@ def collection(self, identifier): return Collection(self, identifier) - def resource(self, identifier): + def resource(self, identifier: Union[Node, str]) -> Resource: """Create a new ``Resource`` instance. Parameters: @@ -1537,20 +1689,32 @@ def resource(self, identifier): identifier = URIRef(identifier) return Resource(self, identifier) - def _process_skolem_tuples(self, target, func): + def _process_skolem_tuples( + self, target: Graph, func: Callable[[_TripleType], _TripleType] + ) -> None: for t in self.triples((None, None, None)): target.add(func(t)) - def skolemize(self, new_graph=None, bnode=None, authority=None, basepath=None): - def do_skolemize(bnode, t): + def skolemize( + self, + new_graph: Optional[Graph] = None, + bnode: Optional[BNode] = None, + authority: Optional[str] = None, + basepath: Optional[str] = None, + ) -> Graph: + def do_skolemize(bnode: BNode, t: _TripleType) -> _TripleType: (s, p, o) = t if s == bnode: + if TYPE_CHECKING: + assert isinstance(s, BNode) s = s.skolemize(authority=authority, basepath=basepath) if o == bnode: + if TYPE_CHECKING: + assert isinstance(o, BNode) o = o.skolemize(authority=authority, basepath=basepath) return s, p, o - def do_skolemize2(t): + def do_skolemize2(t: _TripleType) -> _TripleType: (s, p, o) = t if isinstance(s, BNode): s = s.skolemize(authority=authority, basepath=basepath) @@ -1563,31 +1727,42 @@ def do_skolemize2(t): if bnode is None: self._process_skolem_tuples(retval, do_skolemize2) elif isinstance(bnode, BNode): - self._process_skolem_tuples(retval, lambda t: do_skolemize(bnode, t)) + # type error: Argument 1 to "do_skolemize" has incompatible type "Optional[BNode]"; expected "BNode" + self._process_skolem_tuples(retval, lambda t: do_skolemize(bnode, t)) # type: ignore[arg-type] return retval - def de_skolemize(self, new_graph=None, uriref=None): - def do_de_skolemize(uriref, t): + def de_skolemize( + self, new_graph: Optional[Graph] = None, uriref: Optional[URIRef] = None + ) -> Graph: + def do_de_skolemize(uriref: URIRef, t: _TripleType) -> _TripleType: (s, p, o) = t if s == uriref: + if TYPE_CHECKING: + assert isinstance(s, URIRef) s = s.de_skolemize() if o == uriref: + if TYPE_CHECKING: + assert isinstance(o, URIRef) o = o.de_skolemize() return s, p, o - def do_de_skolemize2(t): + def do_de_skolemize2(t: _TripleType) -> _TripleType: (s, p, o) = t if RDFLibGenid._is_rdflib_skolem(s): - s = RDFLibGenid(s).de_skolemize() + # type error: Argument 1 to "RDFLibGenid" has incompatible type "Node"; expected "str" + s = RDFLibGenid(s).de_skolemize() # type: ignore[arg-type] elif Genid._is_external_skolem(s): - s = Genid(s).de_skolemize() + # type error: Argument 1 to "Genid" has incompatible type "Node"; expected "str" + s = Genid(s).de_skolemize() # type: ignore[arg-type] if RDFLibGenid._is_rdflib_skolem(o): - o = RDFLibGenid(o).de_skolemize() + # type error: Argument 1 to "RDFLibGenid" has incompatible type "Node"; expected "str" + o = RDFLibGenid(o).de_skolemize() # type: ignore[arg-type] elif Genid._is_external_skolem(o): - o = Genid(o).de_skolemize() + # type error: Argument 1 to "Genid" has incompatible type "Node"; expected "str" + o = Genid(o).de_skolemize() # type: ignore[arg-type] return s, p, o @@ -1596,11 +1771,12 @@ def do_de_skolemize2(t): if uriref is None: self._process_skolem_tuples(retval, do_de_skolemize2) elif isinstance(uriref, Genid): - self._process_skolem_tuples(retval, lambda t: do_de_skolemize(uriref, t)) + # type error: Argument 1 to "do_de_skolemize" has incompatible type "Optional[URIRef]"; expected "URIRef" + self._process_skolem_tuples(retval, lambda t: do_de_skolemize(uriref, t)) # type: ignore[arg-type] return retval - def cbd(self, resource): + def cbd(self, resource: _SubjectType) -> Graph: """Retrieves the Concise Bounded Description of a Resource from a Graph Concise Bounded Description (CBD) is defined in [1] as: @@ -1631,7 +1807,7 @@ def cbd(self, resource): """ subgraph = Graph() - def add_to_cbd(uri): + def add_to_cbd(uri: _SubjectType) -> None: for s, p, o in self.triples((uri, None, None)): subgraph.add((s, p, o)) # recurse 'down' through ll Blank Nodes @@ -1683,11 +1859,11 @@ def __init__( ) self.context_aware = True self.default_union = True # Conjunctive! - self.default_context = Graph( + self.default_context: _ContextType = Graph( store=self.store, identifier=identifier or BNode(), base=default_graph_base ) - def __str__(self): + def __str__(self) -> str: pattern = ( "[a rdflib:ConjunctiveGraph;rdflib:storage " "[a rdflib:Store;rdfs:label '%s']]" @@ -1697,22 +1873,17 @@ def __str__(self): @overload def _spoc( self, - triple_or_quad: Union[ - Tuple[ - Optional["_SubjectType"], - Optional["_PredicateType"], - Optional["_ObjectType"], - Optional[Any], - ], - "_TriplePatternType", - ], + triple_or_quad: "_QuadType", default: bool = False, - ) -> Tuple[ - Optional["_SubjectType"], - Optional["_PredicateType"], - Optional["_ObjectType"], - Optional[Graph], - ]: + ) -> "_QuadType": + ... + + @overload + def _spoc( + self, + triple_or_quad: Union["_TripleType", "_OptionalQuadType"], + default: bool = False, + ) -> "_OptionalQuadType": ... @overload @@ -1723,26 +1894,35 @@ def _spoc( ) -> Tuple[None, None, None, Optional[Graph]]: ... + @overload def _spoc( self, - triple_or_quad: Optional[ - Union[ - Tuple[ - Optional["_SubjectType"], - Optional["_PredicateType"], - Optional["_ObjectType"], - Optional[Any], - ], - "_TriplePatternType", - ] - ], + triple_or_quad: Optional[_TripleOrQuadPatternType], default: bool = False, - ) -> Tuple[ - Optional["_SubjectType"], - Optional["_PredicateType"], - Optional["_ObjectType"], - Optional[Graph], - ]: + ) -> "_QuadPatternType": + ... + + @overload + def _spoc( + self, + triple_or_quad: _TripleOrQuadSelectorType, + default: bool = False, + ) -> _QuadSelectorType: + ... + + @overload + def _spoc( + self, + triple_or_quad: Optional[_TripleOrQuadSelectorType], + default: bool = False, + ) -> _QuadSelectorType: + ... + + def _spoc( + self, + triple_or_quad: Optional[_TripleOrQuadSelectorType], + default: bool = False, + ) -> _QuadSelectorType: """ helper method for having methods that support either triples or quads @@ -1751,13 +1931,15 @@ def _spoc( return (None, None, None, self.default_context if default else None) if len(triple_or_quad) == 3: c = self.default_context if default else None + # type error: Too many values to unpack (3 expected, 4 provided) (s, p, o) = triple_or_quad # type: ignore[misc] elif len(triple_or_quad) == 4: + # type error: Need more than 3 values to unpack (4 expected) (s, p, o, c) = triple_or_quad # type: ignore[misc] c = self._graph(c) return s, p, o, c - def __contains__(self, triple_or_quad): + def __contains__(self, triple_or_quad: _TripleOrQuadPatternType) -> bool: """Support for 'triple/quad in graph' syntax""" s, p, o, c = self._spoc(triple_or_quad) for t in self.triples((s, p, o), context=c): @@ -1765,12 +1947,9 @@ def __contains__(self, triple_or_quad): return False def add( - self, - triple_or_quad: Union[ - Tuple["_SubjectType", "_PredicateType", "_ObjectType", Optional[Any]], - "_TripleType", - ], - ) -> "ConjunctiveGraph": + self: _ConjunctiveGraphT, + triple_or_quad: _TripleOrOptionalQuadType, + ) -> _ConjunctiveGraphT: """ Add a triple or quad to the store. @@ -1786,14 +1965,16 @@ def add( return self @overload - def _graph(self, c: Union[Graph, Node, str]) -> Graph: + def _graph(self, c: Union[Graph, _ContextIdentifierType, str]) -> Graph: ... @overload def _graph(self, c: None) -> None: ... - def _graph(self, c: Optional[Union[Graph, Node, str]]) -> Optional[Graph]: + def _graph( + self, c: Optional[Union[Graph, _ContextIdentifierType, str]] + ) -> Optional[Graph]: if c is None: return None if not isinstance(c, Graph): @@ -1801,7 +1982,9 @@ def _graph(self, c: Optional[Union[Graph, Node, str]]) -> Optional[Graph]: else: return c - def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 + def addN( # noqa: N802 + self: _ConjunctiveGraphT, quads: Iterable["_QuadType"] + ) -> _ConjunctiveGraphT: """Add a sequence of triples with context""" self.store.addN( @@ -1809,7 +1992,8 @@ def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 ) return self - def remove(self, triple_or_quad): + # type error: Argument 1 of "remove" is incompatible with supertype "Graph"; supertype defines the argument type as "Tuple[Optional[Node], Optional[Node], Optional[Node]]" + def remove(self: _ConjunctiveGraphT, triple_or_quad: _TripleOrOptionalQuadType) -> _ConjunctiveGraphT: # type: ignore[override] """ Removes a triple or quads @@ -1823,7 +2007,35 @@ def remove(self, triple_or_quad): self.store.remove((s, p, o), context=c) return self - def triples(self, triple_or_quad, context=None): + @overload + def triples( + self, + triple_or_quad: "_TripleOrQuadPatternType", + context: Optional[_ContextType] = ..., + ) -> Generator["_TripleType", None, None]: + ... + + @overload + def triples( + self, + triple_or_quad: "_TripleOrQuadPathPatternType", + context: Optional[_ContextType] = ..., + ) -> Generator["_TriplePathType", None, None]: + ... + + @overload + def triples( + self, + triple_or_quad: _TripleOrQuadSelectorType, + context: Optional[_ContextType] = ..., + ) -> Generator["_TripleOrTriplePathType", None, None]: + ... + + def triples( + self, + triple_or_quad: _TripleOrQuadSelectorType, + context: Optional[_ContextType] = None, + ) -> Generator["_TripleOrTriplePathType", None, None]: """ Iterate over all the triples in the entire conjunctive graph @@ -1853,12 +2065,7 @@ def triples(self, triple_or_quad, context=None): yield s, p, o def quads( - self, - triple_or_quad: Union[ - "_TriplePatternType", - "_QuadPatternType", - None, - ] = None, + self, triple_or_quad: Optional[_TripleOrQuadPatternType] = None ) -> Generator[_OptionalQuadType, None, None]: """Iterate over all the quads in the entire conjunctive graph""" @@ -1868,7 +2075,15 @@ def quads( for ctx in cg: yield s, p, o, ctx - def triples_choices(self, triple, context=None): + def triples_choices( + self, + triple: Union[ + Tuple[List["_SubjectType"], "_PredicateType", "_ObjectType"], + Tuple["_SubjectType", List["_PredicateType"], "_ObjectType"], + Tuple["_SubjectType", "_PredicateType", List["_ObjectType"]], + ], + context: Optional["_ContextType"] = None, + ) -> Generator[_TripleType, None, None]: """Iterate over all the triples in the entire conjunctive graph""" s, p, o = triple if context is None: @@ -1876,17 +2091,18 @@ def triples_choices(self, triple, context=None): context = self.default_context else: context = self._graph(context) - - for (s1, p1, o1), cg in self.store.triples_choices((s, p, o), context=context): + # type error: Argument 1 to "triples_choices" of "Store" has incompatible type "Tuple[Union[List[Node], Node], Union[Node, List[Node]], Union[Node, List[Node]]]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" + # type error note: unpacking discards type info + for (s1, p1, o1), cg in self.store.triples_choices((s, p, o), context=context): # type: ignore[arg-type] yield s1, p1, o1 - def __len__(self): + def __len__(self) -> int: """Number of triples in the entire conjunctive graph""" return self.store.__len__() def contexts( self, triple: Optional["_TripleType"] = None - ) -> Generator[Graph, None, None]: + ) -> Generator["_ContextType", None, None]: """Iterate over all contexts in the graph If triple is specified, iterate over all contexts the triple is in. @@ -1901,13 +2117,13 @@ def contexts( # type error: Statement is unreachable yield self.get_context(context) # type: ignore[unreachable] - def get_graph(self, identifier: Union[URIRef, BNode]) -> Union[Graph, None]: + def get_graph(self, identifier: "_ContextIdentifierType") -> Union[Graph, None]: """Returns the graph identified by given identifier""" return [x for x in self.contexts() if x.identifier == identifier][0] def get_context( self, - identifier: Optional[Union[Node, str]], + identifier: Optional[Union["_ContextIdentifierType", str]], quoted: bool = False, base: Optional[str] = None, ) -> Graph: @@ -1916,10 +2132,13 @@ def get_context( identifier must be a URIRef or BNode. """ return Graph( - store=self.store, identifier=identifier, namespace_manager=self.namespace_manager, base=base # type: ignore[arg-type] + store=self.store, + identifier=identifier, + namespace_manager=self.namespace_manager, + base=base, ) - def remove_context(self, context): + def remove_context(self, context: "_ContextType") -> None: """Removes the given context from the graph""" self.store.remove((None, None, None), context) @@ -1940,8 +2159,8 @@ def parse( location: Optional[str] = None, file: Optional[Union[BinaryIO, TextIO]] = None, data: Optional[Union[str, bytes]] = None, - **args, - ): + **args: Any, + ) -> "Graph": """ Parse source adding the resulting triples to its own context (sub graph of this graph). @@ -1980,7 +2199,7 @@ def parse( # TODO: FIXME: This should not return context, but self. return context - def __reduce__(self): + def __reduce__(self) -> Tuple[Type[Graph], Tuple[Store, _ContextIdentifierType]]: return ConjunctiveGraph, (self.store, self.identifier) @@ -2110,7 +2329,12 @@ class Dataset(ConjunctiveGraph): .. versionadded:: 4.0 """ - def __init__(self, store="default", default_union=False, default_graph_base=None): + def __init__( + self, + store: Union[Store, str] = "default", + default_union: bool = False, + default_graph_base: Optional[str] = None, + ): super(Dataset, self).__init__(store=store, identifier=None) if not self.store.graph_aware: @@ -2123,22 +2347,32 @@ def __init__(self, store="default", default_union=False, default_graph_base=None self.default_union = default_union - def __str__(self): + def __str__(self) -> str: pattern = ( "[a rdflib:Dataset;rdflib:storage " "[a rdflib:Store;rdfs:label '%s']]" ) return pattern % self.store.__class__.__name__ - def __reduce__(self): + # type error: Return type "Tuple[Type[Dataset], Tuple[Store, bool]]" of "__reduce__" incompatible with return type "Tuple[Type[Graph], Tuple[Store, IdentifiedNode]]" in supertype "ConjunctiveGraph" + # type error: Return type "Tuple[Type[Dataset], Tuple[Store, bool]]" of "__reduce__" incompatible with return type "Tuple[Type[Graph], Tuple[Store, IdentifiedNode]]" in supertype "Graph" + def __reduce__(self) -> Tuple[Type[Dataset], Tuple[Store, bool]]: # type: ignore[override] return (type(self), (self.store, self.default_union)) - def __getstate__(self): + def __getstate__(self) -> Tuple[Store, _ContextIdentifierType, _ContextType, bool]: return self.store, self.identifier, self.default_context, self.default_union - def __setstate__(self, state): - self.store, self.identifier, self.default_context, self.default_union = state + def __setstate__( + self, state: Tuple[Store, _ContextIdentifierType, _ContextType, bool] + ) -> None: + # type error: Property "store" defined in "Graph" is read-only + # type error: Property "identifier" defined in "Graph" is read-only + self.store, self.identifier, self.default_context, self.default_union = state # type: ignore[misc] - def graph(self, identifier=None, base=None): + def graph( + self, + identifier: Optional[Union[_ContextIdentifierType, _ContextType, str]] = None, + base: Optional[str] = None, + ) -> Graph: if identifier is None: from rdflib.term import _SKOLEM_DEFAULT_AUTHORITY, rdflib_skolem_genid @@ -2157,25 +2391,31 @@ def graph(self, identifier=None, base=None): def parse( self, - source=None, - publicID=None, # noqa: N803 - format=None, - location=None, - file=None, - data=None, - **args, - ): + source: Optional[ + Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] + ] = None, + publicID: Optional[str] = None, # noqa: N803 + format: Optional[str] = None, + location: Optional[str] = None, + file: Optional[Union[BinaryIO, TextIO]] = None, + data: Optional[Union[str, bytes]] = None, + **args: Any, + ) -> "Graph": c = ConjunctiveGraph.parse( self, source, publicID, format, location, file, data, **args ) self.graph(c) return c - def add_graph(self, g): + def add_graph( + self, g: Optional[Union[_ContextIdentifierType, _ContextType, str]] + ) -> Graph: """alias of graph for consistency""" return self.graph(g) - def remove_graph(self, g): + def remove_graph( + self: _DatasetT, g: Optional[Union[_ContextIdentifierType, _ContextType, str]] + ) -> _DatasetT: if not isinstance(g, Graph): g = self.get_context(g) @@ -2186,7 +2426,9 @@ def remove_graph(self, g): self.store.add_graph(self.default_context) return self - def contexts(self, triple=None): + def contexts( + self, triple: Optional["_TripleType"] = None + ) -> Generator["_ContextType", None, None]: default = False for c in super(Dataset, self).contexts(triple): default |= c.identifier == DATASET_DEFAULT_GRAPH_ID @@ -2198,12 +2440,7 @@ def contexts(self, triple=None): # type error: Return type "Generator[Tuple[Node, Node, Node, Optional[Node]], None, None]" of "quads" incompatible with return type "Generator[Tuple[Node, Node, Node, Optional[Graph]], None, None]" in supertype "ConjunctiveGraph" def quads( # type: ignore[override] - self, - quad: Union[ - "_TriplePatternType", - "_QuadPatternType", - None, - ] = None, + self, quad: Optional["_TripleOrQuadPatternType"] = None ) -> Generator[_OptionalIdentifiedQuadType, None, None]: for s, p, o, c in super(Dataset, self).quads(quad): # type error: Item "None" of "Optional[Graph]" has no attribute "identifier" @@ -2229,10 +2466,14 @@ class QuotedGraph(Graph): such as implication and other such processing. """ - def __init__(self, store, identifier): + def __init__( + self, + store: Union[Store, str], + identifier: Optional[Union[_ContextIdentifierType, str]], + ): super(QuotedGraph, self).__init__(store, identifier) - def add(self, triple: "_TripleType"): + def add(self: _GraphT, triple: "_TripleType") -> _GraphT: """Add a triple with self as context""" s, p, o = triple assert isinstance(s, Node), "Subject %s must be an rdflib term" % (s,) @@ -2242,7 +2483,7 @@ def add(self, triple: "_TripleType"): self.store.add((s, p, o), self, quoted=True) return self - def addN(self, quads: Iterable["_QuadType"]) -> "QuotedGraph": # noqa: N802 + def addN(self: _GraphT, quads: Iterable["_QuadType"]) -> _GraphT: # noqa: N802 """Add a sequence of triple with context""" self.store.addN( @@ -2254,12 +2495,14 @@ def addN(self, quads: Iterable["_QuadType"]) -> "QuotedGraph": # noqa: N802 ) return self - def n3(self): + def n3(self) -> str: """Return an n3 identifier for the Graph""" - return "{%s}" % self.identifier.n3() + # type error: "IdentifiedNode" has no attribute "n3" + return "{%s}" % self.identifier.n3() # type: ignore[attr-defined] - def __str__(self): - identifier = self.identifier.n3() + def __str__(self) -> str: + # type error: "IdentifiedNode" has no attribute "n3" + identifier = self.identifier.n3() # type: ignore[attr-defined] label = self.store.__class__.__name__ pattern = ( "{this rdflib.identifier %s;rdflib:storage " @@ -2267,7 +2510,7 @@ def __str__(self): ) return pattern % (identifier, label) - def __reduce__(self): + def __reduce__(self) -> Tuple[Type[Graph], Tuple[Store, _ContextIdentifierType]]: return QuotedGraph, (self.store, self.identifier) @@ -2287,7 +2530,7 @@ class Seq(object): 'implementation' of a sequence in RDF terms. """ - def __init__(self, graph, subject): + def __init__(self, graph: Graph, subject: _SubjectType): """Parameters: - graph: @@ -2299,40 +2542,43 @@ def __init__(self, graph, subject): creates this instance! """ + self._list: List[Tuple[int, _ObjectType]] _list = self._list = list() LI_INDEX = URIRef(str(RDF) + "_") # noqa: N806 for (p, o) in graph.predicate_objects(subject): - if p.startswith(LI_INDEX): # != RDF.Seq: # - i = int(p.replace(LI_INDEX, "")) + # type error: "Node" has no attribute "startswith" + if p.startswith(LI_INDEX): # type: ignore[attr-defined] # != RDF.Seq: + # type error: "Node" has no attribute "replace" + i = int(p.replace(LI_INDEX, "")) # type: ignore[attr-defined] _list.append((i, o)) # here is the trick: the predicates are _1, _2, _3, etc. Ie, # by sorting the keys (by integer) we have what we want! _list.sort() - def toPython(self): # noqa: N802 + def toPython(self) -> "Seq": # noqa: N802 return self - def __iter__(self): + def __iter__(self) -> Generator[_ObjectType, None, None]: """Generator over the items in the Seq""" for _, item in self._list: yield item - def __len__(self): + def __len__(self) -> int: """Length of the Seq""" return len(self._list) - def __getitem__(self, index): + def __getitem__(self, index) -> _ObjectType: """Item given by index from the Seq""" index, item = self._list.__getitem__(index) return item class ModificationException(Exception): - def __init__(self): + def __init__(self) -> None: pass - def __str__(self): + def __str__(self) -> str: return ( "Modifications and transactional operations not allowed on " "ReadOnlyGraphAggregate instances" @@ -2340,10 +2586,10 @@ def __str__(self): class UnSupportedAggregateOperation(Exception): - def __init__(self): + def __init__(self) -> None: pass - def __str__(self): + def __str__(self) -> str: return "This operation is not supported by ReadOnlyGraphAggregate " "instances" @@ -2354,7 +2600,7 @@ class ReadOnlyGraphAggregate(ConjunctiveGraph): ConjunctiveGraph over an explicit subset of the entire store. """ - def __init__(self, graphs, store="default"): + def __init__(self, graphs: List[Graph], store: Union[str, Store] = "default"): if store is not None: super(ReadOnlyGraphAggregate, self).__init__(store) Graph.__init__(self, store) @@ -2367,38 +2613,68 @@ def __init__(self, graphs, store="default"): ), "graphs argument must be a list of Graphs!!" self.graphs = graphs - def __repr__(self): + def __repr__(self) -> str: return "" % len(self.graphs) - def destroy(self, configuration): + def destroy(self, configuration: str) -> NoReturn: raise ModificationException() # Transactional interfaces (optional) - def commit(self): + def commit(self) -> NoReturn: raise ModificationException() - def rollback(self): + def rollback(self) -> NoReturn: raise ModificationException() - def open(self, configuration, create=False): + def open(self, configuration: str, create: bool = False) -> None: # TODO: is there a use case for this method? for graph in self.graphs: - graph.open(self, configuration, create) + # type error: Too many arguments for "open" of "Graph" + # type error: Argument 1 to "open" of "Graph" has incompatible type "ReadOnlyGraphAggregate"; expected "str" [arg-type] + # type error: Argument 2 to "open" of "Graph" has incompatible type "str"; expected "bool" [arg-type] + graph.open(self, configuration, create) # type: ignore[call-arg, arg-type] - def close(self): + # type error: Signature of "close" incompatible with supertype "Graph" + def close(self) -> None: # type: ignore[override] for graph in self.graphs: graph.close() - def add(self, triple): + def add(self, triple: _TripleOrOptionalQuadType) -> NoReturn: raise ModificationException() - def addN(self, quads): # noqa: N802 + def addN(self, quads: Iterable["_QuadType"]) -> NoReturn: # noqa: N802 raise ModificationException() - def remove(self, triple): + # type error: Argument 1 of "remove" is incompatible with supertype "Graph"; supertype defines the argument type as "Tuple[Optional[Node], Optional[Node], Optional[Node]]" + def remove(self, triple: _TripleOrOptionalQuadType) -> NoReturn: # type: ignore[override] raise ModificationException() - def triples(self, triple): + # type error: Signature of "triples" incompatible with supertype "ConjunctiveGraph" + @overload # type: ignore[override] + def triples( + self, + triple: "_TriplePatternType", + ) -> Generator["_TripleType", None, None]: + ... + + @overload + def triples( + self, + triple: "_TriplePathPatternType", + ) -> Generator["_TriplePathType", None, None]: + ... + + @overload + def triples( + self, + triple: "_TripleSelectorType", + ) -> Generator["_TripleOrTriplePathType", None, None]: + ... + + def triples( + self, + triple: "_TripleSelectorType", + ) -> Generator["_TripleOrTriplePathType", None, None]: s, p, o = triple for graph in self.graphs: if isinstance(p, Path): @@ -2408,23 +2684,35 @@ def triples(self, triple): for s1, p1, o1 in graph.triples((s, p, o)): yield s1, p1, o1 - def __contains__(self, triple_or_quad): + def __contains__(self, triple_or_quad: _TripleOrQuadPatternType) -> bool: context = None if len(triple_or_quad) == 4: - context = triple_or_quad[3] + # type error: Tuple index out of range + context = triple_or_quad[3] # type: ignore [misc] for graph in self.graphs: if context is None or graph.identifier == context.identifier: if triple_or_quad[:3] in graph: return True return False - def quads(self, triple_or_quad): + # type error: Signature of "quads" incompatible with supertype "ConjunctiveGraph" + def quads( # type: ignore[override] + self, triple_or_quad: _TripleOrQuadSelectorType + ) -> Generator[ + Tuple[ + "_SubjectType", Union[Path, "_PredicateType"], "_ObjectType", "_ContextType" + ], + None, + None, + ]: """Iterate over all the quads in the entire aggregate graph""" c = None if len(triple_or_quad) == 4: - s, p, o, c = triple_or_quad + # type error: Need more than 3 values to unpack (4 expected) + s, p, o, c = triple_or_quad # type: ignore[misc] else: - s, p, o = triple_or_quad + # type error: Too many values to unpack (3 expected, 4 provided) + s, p, o = triple_or_quad # type: ignore[misc] if c is not None: for graph in [g for g in self.graphs if g == c]: @@ -2435,13 +2723,13 @@ def quads(self, triple_or_quad): for s1, p1, o1 in graph.triples((s, p, o)): yield s1, p1, o1, graph - def __len__(self): + def __len__(self) -> int: return sum(len(g) for g in self.graphs) - def __hash__(self): + def __hash__(self) -> NoReturn: raise UnSupportedAggregateOperation() - def __cmp__(self, other): + def __cmp__(self, other) -> int: if other is None: return -1 elif isinstance(other, Graph): @@ -2451,57 +2739,89 @@ def __cmp__(self, other): else: return -1 - def __iadd__(self: "_GraphT", other: Iterable["_TripleType"]) -> "_GraphT": + def __iadd__(self: "_GraphT", other: Iterable["_TripleType"]) -> NoReturn: raise ModificationException() - def __isub__(self: "_GraphT", other: Iterable["_TripleType"]) -> "_GraphT": + def __isub__(self: "_GraphT", other: Iterable["_TripleType"]) -> NoReturn: raise ModificationException() # Conv. methods - def triples_choices(self, triple, context=None): + def triples_choices( + self, + triple: Union[ + Tuple[List["_SubjectType"], "_PredicateType", "_ObjectType"], + Tuple["_SubjectType", List["_PredicateType"], "_ObjectType"], + Tuple["_SubjectType", "_PredicateType", List["_ObjectType"]], + ], + context: Optional["_ContextType"] = None, + ) -> Generator[_TripleType, None, None]: subject, predicate, object_ = triple for graph in self.graphs: - choices = graph.triples_choices((subject, predicate, object_)) + # type error: Argument 1 to "triples_choices" of "Graph" has incompatible type "Tuple[Union[List[Node], Node], Union[Node, List[Node]], Union[Node, List[Node]]]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" + # type error note: unpacking discards type info + choices = graph.triples_choices((subject, predicate, object_)) # type: ignore[arg-type] for (s, p, o) in choices: yield s, p, o - def qname(self, uri): + def qname(self, uri: str) -> str: if hasattr(self, "namespace_manager") and self.namespace_manager: return self.namespace_manager.qname(uri) raise UnSupportedAggregateOperation() - def compute_qname(self, uri, generate=True): + def compute_qname(self, uri: str, generate: bool = True) -> Tuple[str, URIRef, str]: if hasattr(self, "namespace_manager") and self.namespace_manager: return self.namespace_manager.compute_qname(uri, generate) raise UnSupportedAggregateOperation() - def bind(self, prefix, namespace, override=True): + # type error: Signature of "bind" incompatible with supertype "Graph" + def bind( # type: ignore[override] + self, prefix: Optional[str], namespace: Any, override: bool = True # noqa: F811 + ) -> NoReturn: raise UnSupportedAggregateOperation() - def namespaces(self): + def namespaces(self) -> Generator[Tuple[str, URIRef], None, None]: if hasattr(self, "namespace_manager"): - for prefix, namespace in self.namespace_manager.namespaces(): + for prefix, namespace in self.namespace_manager.namespaces(): # noqa: F402 yield prefix, namespace else: for graph in self.graphs: for prefix, namespace in graph.namespaces(): yield prefix, namespace - def absolutize(self, uri, defrag=1): + def absolutize(self, uri: str, defrag: int = 1) -> NoReturn: raise UnSupportedAggregateOperation() - def parse(self, source, publicID=None, format=None, **args): # noqa: N803 + # type error: Signature of "parse" incompatible with supertype "ConjunctiveGraph" + def parse( # type: ignore[override] + self, + source: Optional[ + Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] + ], + publicID: Optional[str] = None, + format: Optional[str] = None, + **args: Any, + ) -> NoReturn: # noqa: N803 raise ModificationException() - def n3(self): + def n3(self) -> NoReturn: raise UnSupportedAggregateOperation() - def __reduce__(self): + def __reduce__(self) -> NoReturn: raise UnSupportedAggregateOperation() -def _assertnode(*terms): +@overload +def _assertnode(*terms: Node) -> "te.Literal[True]": + ... + + +@overload +def _assertnode(*terms: Any) -> bool: + ... + + +def _assertnode(*terms: Any) -> bool: for t in terms: assert isinstance(t, Node), "Term %s must be an rdflib term" % (t,) return True @@ -2535,11 +2855,11 @@ def __init__(self, graph: Graph, batch_size: int = 1000, batch_addn: bool = Fals self.__batch_addn = batch_addn self.reset() - def reset(self): + def reset(self) -> BatchAddGraph: """ Manually clear the buffered triples and reset the count to zero """ - self.batch = [] + self.batch: List[_QuadType] = [] self.count = 0 return self @@ -2560,12 +2880,14 @@ def add( self.batch = [] self.count += 1 if len(triple_or_quad) == 3: - self.batch.append(triple_or_quad + self.__graph_tuple) + # type error: Argument 1 to "append" of "list" has incompatible type "Tuple[Node, ...]"; expected "Tuple[Node, Node, Node, Graph]" + self.batch.append(triple_or_quad + self.__graph_tuple) # type: ignore[arg-type] else: - self.batch.append(triple_or_quad) + # type error: Argument 1 to "append" of "list" has incompatible type "Union[Tuple[Node, Node, Node], Tuple[Node, Node, Node, Graph]]"; expected "Tuple[Node, Node, Node, Graph]" + self.batch.append(triple_or_quad) # type: ignore[arg-type] return self - def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 + def addN(self, quads: Iterable["_QuadType"]) -> BatchAddGraph: # noqa: N802 if self.__batch_addn: for q in quads: self.add(q) @@ -2573,10 +2895,10 @@ def addN(self, quads: Iterable["_QuadType"]): # noqa: N802 self.graph.addN(quads) return self - def __enter__(self): + def __enter__(self) -> BatchAddGraph: self.reset() return self - def __exit__(self, *exc): + def __exit__(self, *exc) -> None: if exc[0] is None: self.graph.addN(self.batch) diff --git a/rdflib/plugins/parsers/hext.py b/rdflib/plugins/parsers/hext.py index ccfa49363..df4087d54 100644 --- a/rdflib/plugins/parsers/hext.py +++ b/rdflib/plugins/parsers/hext.py @@ -66,7 +66,8 @@ def _parse_hextuple(self, cg: ConjunctiveGraph, tup: List[Union[str, None]]): # 6 - context if tup[5] is not None: c = URIRef(tup[5]) - cg.add((s, p, o, c)) + # type error: Argument 1 to "add" of "ConjunctiveGraph" has incompatible type "Tuple[Union[URIRef, BNode], URIRef, Union[URIRef, BNode, Literal], URIRef]"; expected "Union[Tuple[Node, Node, Node], Tuple[Node, Node, Node, Optional[Graph]]]" + cg.add((s, p, o, c)) # type: ignore[arg-type] else: cg.add((s, p, o)) diff --git a/rdflib/plugins/stores/memory.py b/rdflib/plugins/stores/memory.py index 93c069710..2e96a6e6b 100644 --- a/rdflib/plugins/stores/memory.py +++ b/rdflib/plugins/stores/memory.py @@ -7,6 +7,7 @@ Dict, Generator, Iterator, + Mapping, Optional, Set, Tuple, @@ -244,9 +245,9 @@ def __contexts(self) -> Generator["_ContextType", None, None]: def query( # type: ignore[return] self, query: Union["Query", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", # noqa: N803 + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: "str", # noqa: N803 **kwargs: Any, ) -> "Result": super(SimpleMemory, self).query( @@ -256,9 +257,9 @@ def query( # type: ignore[return] def update( self, update: Union["Update", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", # noqa: N803 + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: "str", # noqa: N803 **kwargs: Any, ) -> None: super(SimpleMemory, self).update( @@ -720,19 +721,19 @@ def __contexts( def query( # type: ignore[return] self, query: Union["Query", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: "str", **kwargs, ) -> "Result": super(Memory, self).query(query, initNs, initBindings, queryGraph, **kwargs) def update( self, - update: Union["Update", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", + update: Union["Update", Any], + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: "str", **kwargs, ) -> None: super(Memory, self).update(update, initNs, initBindings, queryGraph, **kwargs) diff --git a/rdflib/plugins/stores/sparqlstore.py b/rdflib/plugins/stores/sparqlstore.py index 0c0d24202..be01fbe97 100644 --- a/rdflib/plugins/stores/sparqlstore.py +++ b/rdflib/plugins/stores/sparqlstore.py @@ -16,6 +16,7 @@ Iterable, Iterator, List, + Mapping, Optional, Tuple, Union, @@ -191,7 +192,7 @@ def remove( # type: ignore[override] def update( # type: ignore[override] self, query: Union["Update", str], - initNs: Dict[str, str] = {}, # noqa: N803 + initNs: Dict[str, Any] = {}, # noqa: N803 initBindings: Dict["Variable", "Identifier"] = {}, queryGraph: "Identifier" = None, DEBUG: bool = False, @@ -203,7 +204,7 @@ def _query(self, *args: Any, **kwargs: Any) -> "Result": return super(SPARQLStore, self).query(*args, **kwargs) - def _inject_prefixes(self, query: str, extra_bindings: Dict[str, str]) -> str: + def _inject_prefixes(self, query: str, extra_bindings: Mapping[str, Any]) -> str: bindings = set(list(self.nsBindings.items()) + list(extra_bindings.items())) if not bindings: return query @@ -220,9 +221,9 @@ def _inject_prefixes(self, query: str, extra_bindings: Dict[str, str]) -> str: def query( # type: ignore[override] self, query: Union["Query", str], - initNs: Optional[Dict[str, str]] = None, # noqa: N803 - initBindings: Optional[Dict["Variable", "Identifier"]] = None, - queryGraph: Optional["Identifier"] = None, + initNs: Optional[Mapping[str, Any]] = None, # noqa: N803 + initBindings: Optional[Mapping["Variable", "Identifier"]] = None, + queryGraph: Optional["str"] = None, DEBUG: bool = False, ) -> "Result": self.debug = DEBUG @@ -821,9 +822,9 @@ def _update(self, update): def update( # type: ignore[override] self, query: Union["Update", str], - initNs: Dict[str, str] = {}, # noqa: N803 + initNs: Dict[str, Any] = {}, # noqa: N803 initBindings: Dict["Variable", "Identifier"] = {}, - queryGraph: Optional["Identifier"] = None, + queryGraph: Optional[str] = None, DEBUG: bool = False, ): """ diff --git a/rdflib/store.py b/rdflib/store.py index 3e9c6527a..7ddef8f62 100644 --- a/rdflib/store.py +++ b/rdflib/store.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import pickle from io import BytesIO from typing import ( @@ -12,7 +14,6 @@ Optional, Tuple, Union, - overload, ) from rdflib.events import Dispatcher, Event @@ -278,59 +279,17 @@ def remove( """Remove the set of triples matching the pattern from the store""" self.dispatcher.dispatch(TripleRemovedEvent(triple=triple, context=context)) - @overload - def triples_choices( - self, - triple: Tuple[List["_SubjectType"], "_PredicateType", "_ObjectType"], - context: Optional["_ContextType"] = None, - ) -> Generator[ - Tuple["_TripleType", Iterator[Optional["_ContextType"]]], - None, - None, - ]: - ... - - @overload - def triples_choices( - self, - triple: Tuple["_SubjectType", List["_PredicateType"], "_ObjectType"], - context: Optional["_ContextType"] = None, - ) -> Generator[ - Tuple[ - Tuple["_SubjectType", "_PredicateType", "_ObjectType"], - Iterator[Optional["_ContextType"]], - ], - None, - None, - ]: - ... - - @overload - def triples_choices( - self, - triple: Tuple["_SubjectType", "_PredicateType", List["_ObjectType"]], - context: Optional["_ContextType"] = None, - ) -> Generator[ - Tuple[ - Tuple["_SubjectType", "_PredicateType", "_ObjectType"], - Iterator[Optional["_ContextType"]], - ], - None, - None, - ]: - ... - def triples_choices( self, - triple: Tuple[ - Union["_SubjectType", List["_SubjectType"]], - Union["_PredicateType", List["_PredicateType"]], - Union["_ObjectType", List["_ObjectType"]], + triple: Union[ + Tuple[List["_SubjectType"], "_PredicateType", "_ObjectType"], + Tuple["_SubjectType", List["_PredicateType"], "_ObjectType"], + Tuple["_SubjectType", "_PredicateType", List["_ObjectType"]], ], context: Optional["_ContextType"] = None, ) -> Generator[ Tuple[ - Tuple["_SubjectType", "_PredicateType", "_ObjectType"], + _TripleType, Iterator[Optional["_ContextType"]], ], None, @@ -430,9 +389,9 @@ def contexts( def query( self, query: Union["Query", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", # noqa: N803 + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: str, # noqa: N803 **kwargs: Any, ) -> "Result": """ @@ -453,9 +412,9 @@ def query( def update( self, update: Union["Update", str], - initNs: Dict[str, str], # noqa: N803 - initBindings: Dict["Variable", "Identifier"], # noqa: N803 - queryGraph: "Identifier", # noqa: N803 + initNs: Mapping[str, Any], # noqa: N803 + initBindings: Mapping["Variable", "Identifier"], # noqa: N803 + queryGraph: str, # noqa: N803 **kwargs: Any, ) -> None: """ diff --git a/test/test_roundtrip.py b/test/test_roundtrip.py index 58d7480cb..9c310a334 100644 --- a/test/test_roundtrip.py +++ b/test/test_roundtrip.py @@ -271,7 +271,8 @@ def roundtrip( # type error: Incompatible types in assignment (expression has type "Node", variable has type "str") for s, p, o in c.triples((None, None, None)): # type: ignore[assignment] if type(o) == rdflib.Literal and o.datatype == XSD.string: - c.remove((s, p, o)) + # type error: Argument 1 to "remove" of "Graph" has incompatible type "Tuple[str, Node, Literal]"; expected "Tuple[Optional[Node], Optional[Node], Optional[Node]]" + c.remove((s, p, o)) # type: ignore[arg-type] # type error: Argument 1 to "add" of "Graph" has incompatible type "Tuple[str, Node, Literal]"; expected "Tuple[Node, Node, Node]" c.add((s, p, rdflib.Literal(str(o)))) # type: ignore[arg-type] diff --git a/test/test_serializers/test_serializer.py b/test/test_serializers/test_serializer.py index 1a61b1e4b..549a21beb 100644 --- a/test/test_serializers/test_serializer.py +++ b/test/test_serializers/test_serializer.py @@ -60,7 +60,8 @@ def test_rdf_type(format: str, tuple_index: int, is_keyword: bool) -> None: nodes = [NS.subj, NS.pred, NS.obj, NS.graph] nodes[tuple_index] = RDF.type quad = cast(Tuple[URIRef, URIRef, URIRef, URIRef], tuple(nodes)) - graph.add(quad) + # type error: Argument 1 to "add" of "ConjunctiveGraph" has incompatible type "Tuple[URIRef, URIRef, URIRef, URIRef]"; expected "Union[Tuple[Node, Node, Node], Tuple[Node, Node, Node, Optional[Graph]]]" + graph.add(quad) # type: ignore[arg-type] data = graph.serialize(format=format) logging.info("data = %s", data) assert NS in data diff --git a/test/utils/dawg_manifest.py b/test/utils/dawg_manifest.py index e03e0f027..ccb29b99b 100644 --- a/test/utils/dawg_manifest.py +++ b/test/utils/dawg_manifest.py @@ -151,6 +151,7 @@ def from_sources( def included(self) -> Generator["Manifest", None, None]: for includes in self.graph.objects(self.identifier, MF.include): for include in self.graph.items(includes): + assert isinstance(include, str) include_local_path = self.uri_mapper.to_local_path(include) yield from Manifest.from_sources( self.uri_mapper, @@ -166,6 +167,7 @@ def entires( ) -> Generator["ManifestEntryT", None, None]: for entries in self.graph.objects(self.identifier, MF.entries): for entry_iri in self.graph.items(entries): + assert isinstance(entry_iri, URIRef) entry = entry_type(self, entry_iri) if exclude is not None and entry.check_filters(exclude): continue diff --git a/test/utils/manifest.py b/test/utils/manifest.py index bdcfd1978..548aa7ed0 100644 --- a/test/utils/manifest.py +++ b/test/utils/manifest.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from test.utils.namespace import DAWGT, MF, QT, RDFT, UT from typing import Iterable, List, NamedTuple, Optional, Tuple, Union, cast @@ -7,8 +9,10 @@ logger = logging.getLogger(__name__) -ResultType = Union[Identifier, Tuple[Identifier, List[Tuple[Identifier, Identifier]]]] -GraphDataType = Union[List[Identifier], List[Tuple[Identifier, Identifier]]] +ResultType = Union[ + Identifier, Tuple[Optional[Node], List[Tuple[Optional[Node], Optional[Node]]]] +] +GraphDataType = Union[List[Optional[Node]], List[Tuple[Optional[Node], Optional[Node]]]] class RDFTest(NamedTuple): @@ -22,7 +26,9 @@ class RDFTest(NamedTuple): syntax: bool -def read_manifest(f, base=None, legacy=False) -> Iterable[Tuple[Node, URIRef, RDFTest]]: +def read_manifest(f, base=None, legacy=False) -> Iterable[Tuple[Node, Node, RDFTest]]: + """read a manifest file""" + def _str(x): if x is not None: return str(x) @@ -39,7 +45,7 @@ def _str(x): yield x for col in g.objects(m, MF.entries): - e: URIRef + e: Node for e in g.items(col): approved = ( @@ -81,22 +87,22 @@ def _str(x): # NOTE: Casting to identifier because g.objects return Node # but should probably return identifier instead. graphdata = list( - cast(Iterable[Identifier], g.objects(a, QT.graphData)) + cast(Iterable[Optional[Node]], g.objects(a, QT.graphData)) ) - res = g.value(e, MF.result) + res = cast(Optional[ResultType], g.value(e, MF.result)) elif _type in (MF.UpdateEvaluationTest, UT.UpdateEvaluationTest): a = g.value(e, MF.action) query = g.value(a, UT.request) data = g.value(a, UT.data) - graphdata = cast(List[Tuple[Identifier, Identifier]], []) + graphdata = cast(List[Tuple[Optional[Node], Optional[Node]]], []) for gd in g.objects(a, UT.graphData): graphdata.append( (g.value(gd, UT.graph), g.value(gd, RDFS.label)) ) r = g.value(e, MF.result) - resdata: Identifier = g.value(r, UT.data) - resgraphdata: List[Tuple[Identifier, Identifier]] = [] + resdata: Optional[Node] = g.value(r, UT.data) + resgraphdata: List[Tuple[Optional[Node], Optional[Node]]] = [] for gd in g.objects(r, UT.graphData): resgraphdata.append( (g.value(gd, UT.graph), g.value(gd, RDFS.label)) @@ -149,7 +155,7 @@ def _str(x): RDFT.TestTrixEval, ): query = g.value(e, MF.action) - res = g.value(e, MF.result) + res = cast(Identifier, g.value(e, MF.result)) syntax = _type in ( RDFT.TestTurtleEval, RDFT.TestTrigEval, @@ -162,6 +168,8 @@ def _str(x): print("I dont know DAWG Test Type %s" % _type) continue + assert isinstance(e, URIRef) + yield e, _type, RDFTest( e, _str(name), diff --git a/test/utils/sparql_checker.py b/test/utils/sparql_checker.py index 1266e3a40..d6c06eee4 100644 --- a/test/utils/sparql_checker.py +++ b/test/utils/sparql_checker.py @@ -12,7 +12,18 @@ from test.utils.iri import URIMapper from test.utils.namespace import MF, QT, UT from test.utils.result import ResultType, assert_bindings_collections_equal -from typing import Any, Callable, Dict, Generator, Optional, Set, Tuple, Type, Union +from typing import ( + Any, + Callable, + Dict, + Generator, + Optional, + Set, + Tuple, + Type, + Union, + cast, +) from urllib.parse import urljoin import pytest @@ -147,12 +158,19 @@ def __post_init__(self) -> None: if self.type_info.query_type is not None: assert self.result is not None - self.query = self.graph.value(self.action, self.type_info.query_property) + self.query = cast( + Optional[IdentifiedNode], + self.graph.value(self.action, self.type_info.query_property), + ) assert isinstance(self.query, URIRef) assert self.type_info.ns is not None - self.action_data = self.graph.value(self.action, self.type_info.ns.data) - self.expected_outcome = self.graph.value( - self.action, self.type_info.expected_outcome_property + self.action_data = cast( + Optional[IdentifiedNode], + self.graph.value(self.action, self.type_info.ns.data), + ) + self.expected_outcome = cast( + Optional[URIRef], + self.graph.value(self.action, self.type_info.expected_outcome_property), ) for action_graph_data_id in self.graph.objects( self.action, self.type_info.ns.graphData @@ -163,7 +181,10 @@ def __post_init__(self) -> None: self.action_graph_data = set() self.action_graph_data.add(graph_data) if isinstance(self.result, BNode): - self.result_data = self.graph.value(self.result, self.type_info.ns.data) + self.result_data = cast( + Optional[IdentifiedNode], + self.graph.value(self.result, self.type_info.ns.data), + ) else: self.result_data = self.result assert isinstance(self.result_data, URIRef)