-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinput.py
257 lines (200 loc) · 9.4 KB
/
input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import json
import logging
import os
from abc import ABC, abstractmethod
from typing import Any, Iterator, Literal, MutableMapping, Optional, Tuple, Union
import pystac
import requests
import siphon
import xncml
from requests.sessions import Session
from siphon.catalog import TDSCatalog, session_manager
from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate
LOGGER = logging.getLogger(__name__)
class GenericLoader(ABC):
def __init__(self) -> None:
self.links = []
@abstractmethod
def __iter__(self):
"""
A generator that returns an item from the input. The item could be anything
depending on the specific concrete implementation of this abstract class.
"""
raise NotImplementedError
@abstractmethod
def reset(self):
"""Reset the internal state of the generator."""
raise NotImplementedError
class ErrorLoader(GenericLoader):
def __init__(self): # noqa
raise NotImplementedError
def __iter__(self):
raise NotImplementedError
def reset(self):
raise NotImplementedError
class THREDDSCatalog(TDSCatalog):
"""
Patch to apply a custom request session.
Because of how :class:`TDSCatalog` automatically loads and parses right away from ``__init__`` call,
we need to hack around how the ``session`` attribute gets defined.
"""
def __init__(self, catalog_url: str, session: Optional[Session] = None) -> None:
self._session = session
super().__init__(catalog_url)
@property
def session(self) -> Session:
if self._session is None:
self._session = session_manager.create_session()
return self._session
@session.setter
def session(self, session: Session) -> None:
pass # ignore to bypass TDSCatalog.__init__ enforcing create_session !
class THREDDSLoader(GenericLoader):
def __init__(
self,
thredds_catalog_url: str,
depth: Optional[int] = None,
session: Optional[Session] = None,
) -> None:
"""Constructor
:param thredds_catalog_url: the URL to the THREDDS catalog to ingest
:type thredds_catalog_url: str
:param depth: Maximum recursive depth for the class's generator. Setting 0 will return only datasets within the
top-level catalog. If None, depth is set to 1000, defaults to None
:type depth: int, optional
"""
super().__init__()
self._max_depth = depth if depth is not None else 1000
self._depth = 0
self.thredds_catalog_URL = self.validate_catalog_url(thredds_catalog_url)
self.catalog = TDSCatalog(self.thredds_catalog_URL)
self.catalog_head = self.catalog
self.links.append(self.magpie_collection_link())
def validate_catalog_url(self, url: str) -> str:
"""Validate the user-provided catalog URL.
:param url: URL to the THREDDS catalog
:type url: str
:raises RuntimeError: if URL is invalid or contains query parameters.
:return: a valid URL
:rtype: str
"""
if url_validate(url):
if "?" in url:
raise RuntimeError("THREDDS catalog URL should not contain query parameter")
else:
raise RuntimeError("Invalid URL")
return url.replace(".html", ".xml") if url.endswith(".html") else url
def magpie_collection_link(self) -> pystac.Link:
"""Creates a PySTAC Link for the collection that is used by Cowbird and Magpie.
:return: A PySTAC Link
:rtype: pystac.Link
"""
url = self.thredds_catalog_URL
parts = url.split("/")
i = parts.index("catalog")
service = parts[i - 1]
path = "/".join(parts[i + 1 : -1])
title = f"{service}:{path}"
return pystac.Link(rel="source", target=url, media_type="text/xml", title=title)
def reset(self):
"""Reset the generator."""
self.catalog_head = self.catalog
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets.
:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""
if self._depth > self._max_depth:
return
if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
filename = ds.url_path[ds.url_path.rfind("/") :]
url = self.catalog_head.catalog_url[: self.catalog_head.catalog_url.rfind("/")] + filename
yield item_name, url, attrs
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self
self._depth += 1
def __getitem__(self, dataset):
return self.catalog.datasets[dataset]
def extract_metadata(self, ds: siphon.catalog.Dataset) -> MutableMapping[str, Any]:
LOGGER.info("Requesting NcML dataset description")
url = ds.access_urls["NCML"]
r = requests.get(url)
# Convert NcML to CF-compliant dictionary
attrs = xncml.Dataset.from_text(r.text).to_cf_dict()
attrs["attributes"] = numpy_to_python_datatypes(attrs["attributes"])
attrs["access_urls"] = ds.access_urls
return attrs
class STACDirectoryLoader(GenericLoader):
"""
Iterates through a directory structure looking for STAC Collections or Items.
For each directory that gets crawled, if a file is named ``collection.json``, it assumed to be a STAC Collection.
All other ``.json`` files under the directory where ``collection.json`` was found are assumed to be STAC Items.
These JSON STAC Items can be either at the same directory level as the STAC Collection, or under nested directories.
Using the mode option, yielded results will be either the STAC Collections or the STAC Items.
This allows this class to be used in conjunction (2 nested loops) to find collections and their underlying items.
.. code-block:: python
for collection_path, collection_json in STACDirectoryLoader(dir_path, mode="collection"):
for item_path, item_json in STACDirectoryLoader(os.path.dirname(collection_path), mode="item"):
... # do stuff
For convenience, option ``prune`` can be used to stop crawling deeper once a STAC Collection is found.
Any collection files found further down the directory were a top-most match was found will not be yielded.
This can be useful to limit search, or to ignore nested directories using subsets of STAC Collections.
"""
def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool = False) -> None:
super().__init__()
self.path = path
self.iter = None
self.prune = prune
self.reset()
self._collection_mode = mode == "collection"
self._collection_name = "collection.json"
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator that walks through a directory structure looking for sTAC Collections or Items.
:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""
is_root = True
for root, dirs, files in self.iter:
# since there can ever be only one 'collection' file name in a same directory
# directly retrieve it instead of looping through all other files
if self._collection_mode and self._collection_name in files:
if self.prune: # stop recursive search if requested
del dirs[:]
col_path = os.path.join(root, self._collection_name)
yield self._collection_name, col_path, self._load_json(col_path)
# if a collection is found deeper when not expected for items parsing
# drop the nested directories to avoid over-crawling nested collections
elif not self._collection_mode and not is_root and self._collection_name in files:
del dirs[:]
continue
is_root = False # for next loop
for name in files:
if not self._collection_mode and self._is_item(name):
item_path = os.path.join(root, name)
yield self._collection_name, item_path, self._load_json(item_path)
def _is_item(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
return name != self._collection_name and os.path.splitext(name)[-1] in [".json", ".geojson"]
def _load_json(self, path: Union[os.PathLike[str], str]) -> MutableMapping[str, Any]:
with open(path, mode="r", encoding="utf-8") as file:
return json.load(file)
def reset(self):
self.iter = os.walk(self.path)
class STACLoader(GenericLoader):
def __init__(self) -> None:
super().__init__()
def __iter__(self):
raise NotImplementedError
def reset(self):
raise NotImplementedError
class GeoServerLoader(GenericLoader):
def __init__(self) -> None:
super().__init__()
def __iter__(self):
raise NotImplementedError
def reset(self):
raise NotImplementedError