diff --git a/CHANGELOG.md b/CHANGELOG.md index e1d8c1c62..973581802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## Development +### Refactor +- \[DWD Obs\] Make the download function more flexible using threadpool +- \[DWD Obs\] Cleanup parser function + +### Fix +- \[DWD Obs\] Reduce unnecessary file index calls during retrieval of data for stations with multiple files + ## 0.100.0 - 2025-01-06 ### Feature diff --git a/tests/provider/dwd/observation/test_fileindex.py b/tests/provider/dwd/observation/test_fileindex.py index ecf9a8310..578eb23ba 100644 --- a/tests/provider/dwd/observation/test_fileindex.py +++ b/tests/provider/dwd/observation/test_fileindex.py @@ -59,7 +59,7 @@ def test_create_file_list_for_dwd_server(default_settings): station_id="00003", dataset=DwdObservationMetadata.minute_10.temperature_air, period=Period.HISTORICAL, - date_range="19930428_19991231", + date_ranges=["19930428_19991231"], settings=default_settings, ).to_list() assert remote_file_path == [ diff --git a/wetterdienst/provider/dwd/observation/api.py b/wetterdienst/provider/dwd/observation/api.py index 349450bc5..3f9929695 100644 --- a/wetterdienst/provider/dwd/observation/api.py +++ b/wetterdienst/provider/dwd/observation/api.py @@ -21,7 +21,7 @@ from wetterdienst.metadata.period import Period from wetterdienst.metadata.resolution import Resolution from wetterdienst.provider.dwd.observation.download import ( - download_climate_observations_data_parallel, + download_climate_observations_data, ) from wetterdienst.provider.dwd.observation.fileindex import ( _create_file_index_for_dwd_server, @@ -74,30 +74,31 @@ def _collect_station_parameter_or_dataset( date_ranges = self._get_historical_date_ranges( station_id, parameter_or_dataset, self.sr.stations.settings ) - for date_range in date_ranges: - periods_and_date_ranges.append((period, date_range)) + periods_and_date_ranges.append((period, date_ranges)) else: periods_and_date_ranges.append((period, None)) parameter_data = [] - for period, date_range in periods_and_date_ranges: + for period, date_ranges in periods_and_date_ranges: if period not in parameter_or_dataset.periods: log.info(f"Skipping period {period} for {parameter_or_dataset.name}.") continue - dataset_identifier = f"{parameter_or_dataset.resolution.value.name}/{parameter_or_dataset.name}/{station_id}/{period.value}/{date_range}" # noqa: E501 + dataset_identifier = ( + f"{parameter_or_dataset.resolution.value.name}/{parameter_or_dataset.name}/{station_id}/{period.value}" # noqa: E501 + ) log.info(f"Acquiring observation data for {dataset_identifier}.") remote_files = create_file_list_for_climate_observations( station_id, parameter_or_dataset, period, self.sr.stations.settings, - date_range, + date_ranges, ) if remote_files.is_empty(): log.info(f"No files found for {dataset_identifier}. Station will be skipped.") continue - filenames_and_files = download_climate_observations_data_parallel(remote_files, self.sr.stations.settings) + filenames_and_files = download_climate_observations_data(remote_files, self.sr.stations.settings) period_df = parse_climate_observations_data(filenames_and_files, parameter_or_dataset, period) parameter_data.append(period_df) diff --git a/wetterdienst/provider/dwd/observation/download.py b/wetterdienst/provider/dwd/observation/download.py index 8b9756861..b46b78410 100644 --- a/wetterdienst/provider/dwd/observation/download.py +++ b/wetterdienst/provider/dwd/observation/download.py @@ -22,40 +22,22 @@ log = logging.getLogger(__name__) -def download_climate_observations_data_parallel( +def download_climate_observations_data( remote_files: pl.Series, settings: Settings, ) -> list[tuple[str, BytesIO]]: - """ - Wrapper for ``_download_dwd_data`` to provide a multiprocessing feature. - - :param remote_files: List of requested files - :return: List of downloaded files - """ - with ThreadPoolExecutor() as p: - files_in_bytes = p.map( - lambda file: _download_climate_observations_data(remote_file=file, settings=settings), - remote_files, - ) - + if len(remote_files) > 1: + with ThreadPoolExecutor() as p: + files_in_bytes = p.map( + lambda file: _download_climate_observations_data(remote_file=file, settings=settings), + remote_files, + ) + else: + files_in_bytes = [_download_climate_observations_data(remote_file=remote_files[0], settings=settings)] return list(zip(remote_files, files_in_bytes)) def _download_climate_observations_data(remote_file: str, settings: Settings) -> BytesIO: - """ - This function downloads the station data for which the link is - provided by the 'select_dwd' function. It checks the shortened filepath (just - the zipfile) for its parameters, creates the full filepath and downloads the - file(s) according to the set up folder. - - Args: - remote_file: contains path to file that should be downloaded - and the path to the folder to store the files - - Returns: - stores data on local file system - - """ return BytesIO(__download_climate_observations_data(remote_file=remote_file, settings=settings)) diff --git a/wetterdienst/provider/dwd/observation/fileindex.py b/wetterdienst/provider/dwd/observation/fileindex.py index bae5916e0..6fc56febc 100644 --- a/wetterdienst/provider/dwd/observation/fileindex.py +++ b/wetterdienst/provider/dwd/observation/fileindex.py @@ -28,32 +28,18 @@ def create_file_list_for_climate_observations( dataset: DatasetModel, period: Period, settings: Settings, - date_range: str | None = None, + date_ranges: list[str] | None = None, ) -> pl.Series: - """ - Function for selecting datafiles (links to archives) for given - station_ids, parameter, time_resolution and period_type under consideration of a - created list of files that are - available online. - Args: - station_id: station id for the weather station to ask for data - dataset: observation measure - resolution: frequency/granularity of measurement interval - period: recent or historical files - date_range: - Returns: - List of path's to file + """Create a list of files for a given station id, dataset and period. + + Date ranges are used to reduce the number of files to be downloaded based on the date range of the files. + This is useful for hourly or more fine-grained data, where the number of files can be very large. """ file_index = create_file_index_for_climate_observations(dataset, period, settings) - - file_index = file_index.collect() - file_index = file_index.filter(pl.col("station_id").eq(station_id)) - - if date_range: - file_index = file_index.filter(pl.col("date_range").eq(date_range)) - - return file_index.get_column("filename") + if date_ranges: + file_index = file_index.filter(pl.col("date_range").is_in(date_ranges)) + return file_index.collect().get_column("filename") def create_file_index_for_climate_observations( diff --git a/wetterdienst/provider/dwd/observation/parser.py b/wetterdienst/provider/dwd/observation/parser.py index e649d2c2a..dcd631641 100644 --- a/wetterdienst/provider/dwd/observation/parser.py +++ b/wetterdienst/provider/dwd/observation/parser.py @@ -3,31 +3,24 @@ from __future__ import annotations import logging -from io import BytesIO, StringIO from typing import TYPE_CHECKING import polars as pl +from polars import selectors as cs -from wetterdienst.metadata.columns import Columns from wetterdienst.metadata.period import Period from wetterdienst.metadata.resolution import Resolution -from wetterdienst.provider.dwd.metadata import DatetimeFormat from wetterdienst.provider.dwd.observation.metadata import ( DwdObservationMetadata, ) if TYPE_CHECKING: + from io import BytesIO + from wetterdienst.core.timeseries.metadata import DatasetModel log = logging.getLogger(__name__) -# Parameter names used to create full 1 minute precipitation dataset wherever those -# columns are missing (which is the case for non historical data) -PRECIPITATION_PARAMETERS = ( - DwdObservationMetadata.minute_1.precipitation.precipitation_height_droplet.name_original, - DwdObservationMetadata.minute_1.precipitation.precipitation_height_rocker.name_original, -) - PRECIPITATION_MINUTE_1_QUALITY = DwdObservationMetadata.minute_1.precipitation.quality DROPPABLE_PARAMETERS = { @@ -53,26 +46,19 @@ "strahlungstemperatur", } -DATE_FIELDS_REGULAR = { - Columns.DATE.value, - Columns.START_DATE.value, - Columns.END_DATE.value, -} - -DWD_TO_ENGLISH_COLUMNS_MAPPING = { - "stations_id": Columns.STATION_ID.value, - "mess_datum": Columns.DATE.value, - "von_datum": Columns.START_DATE.value, - "bis_datum": Columns.END_DATE.value, - "mess_datum_beginn": Columns.START_DATE.value, - "mess_datum_ende": Columns.END_DATE.value, - "stationshoehe": Columns.HEIGHT.value, - "geobreite": Columns.LATITUDE.value, - "geogr.breite": Columns.LATITUDE.value, - "geolaenge": Columns.LONGITUDE.value, - "geogr.laenge": Columns.LONGITUDE.value, - "stationsname": Columns.NAME.value, - "bundesland": Columns.STATE.value, +COLUMNS_MAPPING = { + "stations_id": "station_id", + "mess_datum": "date", + "stationshoehe": "height", + "geobreite": "latitude", + "geogr.breite": "latitude", + "geolaenge": "longitude", + "geogr.laenge": "longitude", + # those two are only used in the historical 1 minute precipitation data + # we keep start_date and end_date as it is internally named date + # after exploding the date ranges + "mess_datum_beginn": "date", + "mess_datum_ende": "end_date", } @@ -81,21 +67,11 @@ def parse_climate_observations_data( dataset: DatasetModel, period: Period, ) -> pl.LazyFrame: + """This function parses the climate observations data from the DWD. + There's a special case for subdaily wind extremes, as they are stored in two files. + Both files are parsed and joined together afterwards. """ - This function is used to read the station data from given bytes object. - The filename is required to defined if and where an error happened. - Args: - filenames_and_files: list of tuples of a filename and its local stored file - that should be read - dataset: enumeration of parameter used to correctly parse the date field - resolution: enumeration of time resolution used to correctly parse the - date field - period: enumeration of period of data - Returns: - polars.LazyFrame with requested data, for different station ids the data is - still put into one DataFrame - """ - if dataset.resolution.value == Resolution.SUBDAILY and dataset == DwdObservationMetadata.subdaily.wind_extreme: + if dataset == DwdObservationMetadata.subdaily.wind_extreme: data = [ _parse_climate_observations_data(filename_and_file, dataset, period) for filename_and_file in filenames_and_files @@ -107,9 +83,10 @@ def parse_climate_observations_data( except ValueError: return data[0] else: - if len(filenames_and_files) > 1: - raise ValueError("only one file expected") - return _parse_climate_observations_data(filenames_and_files[0], dataset, period) + data = [] + for filename_and_file in filenames_and_files: + data.append(_parse_climate_observations_data(filename_and_file, dataset, period)) + return pl.concat(data) def _parse_climate_observations_data( @@ -117,114 +94,87 @@ def _parse_climate_observations_data( dataset: DatasetModel, period: Period, ) -> pl.LazyFrame: - """ - A wrapping function that only handles data for one station id. The files passed to - it are thus related to this id. This is important for storing the data locally as - the DataFrame that is stored should obviously only handle one station at a time. - Args: - filename_and_file: the files belonging to one station - resolution: enumeration of time resolution used to correctly parse the - date field - Returns: - polars.LazyFrame with data from that station, acn be empty if no data is - provided or local file is not found or has no data in it + """This function parses the climate observations data from the DWD. + There's a special case for 1 minute precipitation data with an early return statement as the function + _transform_minute_1_precipitation_historical already parses the data and especially the dates. """ filename, file = filename_and_file - try: df = pl.read_csv( - source=StringIO(file.read().decode("latin1").replace(" ", "")), + source=file, separator=";", null_values=["-999"], - infer_schema_length=0, - ).lazy() + ) + df = df.lazy() except pl.exceptions.SchemaError: log.warning(f"The file representing {filename} could not be parsed and is skipped.") return pl.LazyFrame() except ValueError: log.warning(f"The file representing {filename} is None and is skipped.") return pl.LazyFrame() - + df = df.with_columns(cs.string().str.strip_chars()) + df = df.with_columns(cs.string().replace("-999", None), cs.numeric().replace(-999, None)) # Column names contain spaces, so strip them away. df = df.rename(mapping=lambda col: col.strip().lower()) - # End of record (EOR) has no value, so drop it right away. df = df.drop(*DROPPABLE_PARAMETERS, strict=False) - + # Assign meaningful column names (baseline). + df = df.rename(mapping=lambda col: COLUMNS_MAPPING.get(col, col)) if dataset == DwdObservationMetadata.minute_1.precipitation: - # Need to unfold historical data, as it is encoded in its run length e.g. - # from time X to time Y precipitation is 0 if period == Period.HISTORICAL: - df = df.with_columns( - pl.col("mess_datum_beginn").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"), - pl.col("mess_datum_ende").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"), - ) - df = df.with_columns( - pl.datetime_ranges(pl.col("mess_datum_beginn"), pl.col("mess_datum_ende"), interval="1m").alias( - "mess_datum", - ), - ) - df = df.drop( - "mess_datum_beginn", - "mess_datum_ende", - ) - # Expand dataframe over calculated date ranges -> one datetime per row - df = df.explode("mess_datum") + # this is a special case, we return as the dates are already parsed and everything is done + return _transform_minute_1_precipitation_historical(df) else: - df = df.with_columns( - [pl.all(), *[pl.lit(None, pl.String).alias(par) for par in PRECIPITATION_PARAMETERS]], + missing_parameters = ( + DwdObservationMetadata.minute_1.precipitation.precipitation_height_droplet.name_original, + DwdObservationMetadata.minute_1.precipitation.precipitation_height_rocker.name_original, ) - df = df.with_columns( - pl.col("mess_datum").cast(str).str.to_datetime(DatetimeFormat.YMDHM.value, time_zone="UTC"), - ) - elif dataset == DwdObservationMetadata.minute_5.precipitation: - # apparently historical datasets differ from recent and now having all columns as described in the - # parameter enumeration when recent and now datasets only have precipitation form and - # precipitation height but not rocker and droplet information - columns = ["stations_id", "mess_datum"] - for parameter in dataset: - columns.append(parameter.name_original) - - df = df.select( - pl.lit(None, dtype=pl.String).alias(col) if col not in df.collect_schema().names() else pl.col(col) - for col in columns - ) + df = df.with_columns(pl.lit(None, pl.String).alias(parameter) for parameter in missing_parameters) + elif dataset == DwdObservationMetadata.minute_5.precipitation and period != Period.HISTORICAL: + missing_parameters = [ + DwdObservationMetadata.minute_5.precipitation.precipitation_height_rocker.name_original, + DwdObservationMetadata.minute_5.precipitation.precipitation_height_droplet.name_original, + ] + df = df.with_columns(pl.lit(None, dtype=pl.Float64).alias(parameter) for parameter in missing_parameters) # Special handling for hourly solar data, as it has more date columns elif dataset == DwdObservationMetadata.hourly.solar: # Fix real date column by cutting of minutes - df = df.with_columns(pl.col("mess_datum").map_elements(lambda date: date[:-3], return_dtype=pl.String)) + df = df.with_columns(pl.col("date").str.head(-3)) elif dataset == DwdObservationMetadata.subdaily.wind_extreme: + if "FX3" in filename: + alias = "qn_8_3" + elif "FX6" in filename: + alias = "qn_8_6" + else: + raise ValueError(f"Unknown dataset for wind extremes, expected FX3 or FX6 in filename {filename}") df = df.select( pl.all().exclude("qn_8"), - pl.col("qn_8").alias("qn_8_3" if "fx_911_3" in df.columns else "qn_8_6"), + pl.col("qn_8").alias(alias), ) - if dataset.resolution.value in (Resolution.MONTHLY, Resolution.ANNUAL): - df = df.drop("bis_datum", "mess_datum_ende", strict=False) - df = df.rename(mapping={"mess_datum_beginn": "mess_datum"}) - - fmt = None - if dataset.resolution.value == Resolution.MINUTE_5: - fmt = "%Y%m%d%H%M" - elif dataset.resolution.value == Resolution.MINUTE_10: - fmt = "%Y%m%d%H%M" - elif dataset.resolution.value == Resolution.HOURLY: - fmt = "%Y%m%d%H%M" - elif dataset.resolution.value == Resolution.SUBDAILY: - fmt = "%Y%m%d%H%M" - elif dataset.resolution.value == Resolution.DAILY: - fmt = "%Y%m%d" - elif dataset.resolution.value == Resolution.MONTHLY: - fmt = "%Y%m%d" - elif dataset.resolution.value == Resolution.ANNUAL: - fmt = "%Y%m%d" + df = df.drop("end_date") + # prepare date column + df = df.with_columns(pl.col("date").cast(pl.String).str.pad_end(12, "0")) + return df.with_columns( + pl.col("date").str.to_datetime("%Y%m%d%H%M", time_zone="UTC"), + ) - if fmt: - if dataset.resolution.value in (Resolution.HOURLY, Resolution.SUBDAILY): - df = df.with_columns(pl.col("mess_datum") + "00") - df = df.with_columns( - pl.col("mess_datum").str.to_datetime(fmt, time_zone="UTC"), - ) - # Assign meaningful column names (baseline). - return df.rename(mapping=lambda col: DWD_TO_ENGLISH_COLUMNS_MAPPING.get(col, col)) +def _transform_minute_1_precipitation_historical(df: pl.LazyFrame) -> pl.LazyFrame: + """We need to unfold historical data, as it is encoded in its run length e.g. + from time X to time Y precipitation is 0 + """ + df = df.with_columns( + pl.col("date").cast(str).str.to_datetime("%Y%m%d%H%M", time_zone="UTC"), + pl.col("end_date").cast(str).str.to_datetime("%Y%m%d%H%M", time_zone="UTC"), + ) + df = df.with_columns( + pl.datetime_ranges(pl.col("date"), pl.col("end_date"), interval="1m").alias( + "date", + ), + ) + df = df.drop( + "end_date", + ) + # Expand dataframe over calculated date ranges -> one datetime per row + return df.explode("date")