Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow missing data for "ts_forecast_panel" task #878

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions flaml/automl/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,12 +1054,14 @@ def _validate_ts_data(
assert (
dataframe[[dataframe.columns[0]]].duplicated() is None
), "Duplicate timestamp values with different values for other columns."
ts_series = pd.to_datetime(dataframe[dataframe.columns[0]])
inferred_freq = pd.infer_freq(ts_series)
if inferred_freq is None:
logger.warning(
"Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. "
)
if self._state.task in TS_FORECAST and not TS_FORECASTPANEL:
# TFT estimator model for TS_FORECASTPANEL can handle missing data
ts_series = pd.to_datetime(dataframe[dataframe.columns[0]])
inferred_freq = pd.infer_freq(ts_series)
if inferred_freq is None:
logger.warning(
"Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. "
)
if y_train_all is not None:
return dataframe.iloc[:, :-1], dataframe.iloc[:, -1]
return dataframe
Expand Down Expand Up @@ -1720,7 +1722,11 @@ def retrain_from_log(
`time_varying_known_categoricals`, `time_varying_known_reals`,
`time_varying_unknown_categoricals`, `time_varying_unknown_reals`,
`variable_groups`. To provide more information on your data, use
`max_encoder_length`, `min_encoder_length`, `lags`.
`max_encoder_length`, `min_encoder_length`, `lags`. To fill in
missing values with constant values, use `constant_fill_strategy`,
else forward fill strategy is used by default.
freq: str or pandas offset | The frequency of the time-series, used only for
'ts_forecast_panel' task.
log_dir: str, default = "lightning_logs" | Folder into which to log results
for tensorboard, only used by TemporalFusionTransformerEstimator.
max_epochs: int, default = 20 | Maximum number of epochs to run training,
Expand Down
26 changes: 20 additions & 6 deletions flaml/automl/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pandas import DataFrame, Series

from flaml.automl.training_log import training_log_reader
# from flaml.automl.automl import logger

from datetime import datetime
from typing import Union
Expand Down Expand Up @@ -254,19 +255,32 @@ def add_time_idx_col(X):
unique_dates = X[TS_TIMESTAMP_COL].drop_duplicates().sort_values(ascending=True)
# assume no missing timestamps
freq = pd.infer_freq(unique_dates)
if freq is None:
# if missing timestamps, try to guess frequency from given time stamps
lookup_table = {
pd.Timedelta('31 days 00:00:00'): "MS",
pd.Timedelta('365 days 00:00:00'): "Y",
}
diff = [X[TS_TIMESTAMP_COL][idx + 1] - X[TS_TIMESTAMP_COL][idx] for idx in range(len(X[TS_TIMESTAMP_COL]) - 1)]
med_diff = np.median(diff)
freq = med_diff if med_diff not in lookup_table else lookup_table[med_diff]
# logger.warning(
# f"Missing Timestamps detected. Inferred Frequency: {freq}."
# f"If inferred frequency is incorrect, please pass in `freq` as a fit_kwargs"
# )
if freq == "MS":
X["time_idx"] = X[TS_TIMESTAMP_COL].dt.year * 12 + X[TS_TIMESTAMP_COL].dt.month
elif freq == "Y":
X["time_idx"] = X[TS_TIMESTAMP_COL].dt.year
else:
# using time frequency to generate all time stamps and then indexing for time_idx
# full_range = pd.date_range(X[TS_TIMESTAMP_COL].min(), X[TS_TIMESTAMP_COL].max(), freq=freq).to_list()
# X["time_idx"] = [full_range.index(time) for time in X[TS_TIMESTAMP_COL]]
full_range = pd.date_range(X[TS_TIMESTAMP_COL].min(), X[TS_TIMESTAMP_COL].max(), freq=freq).to_list()
X["time_idx"] = [full_range.index(time) for time in X[TS_TIMESTAMP_COL]]
# taking minimum difference in timestamp
timestamps = unique_dates.view("int64")
freq = int(timestamps.diff().mode())
X["time_idx"] = timestamps - timestamps.min() / freq
X["time_idx"] = X["time_idx"].astype("int")
# timestamps = unique_dates.view("int64")
# freq = int(timestamps.diff().median())
# X["time_idx"] = timestamps - timestamps.min() / freq
# X["time_idx"] = X["time_idx"].astype("int")
return X


Expand Down
53 changes: 33 additions & 20 deletions flaml/automl/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1808,10 +1808,11 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs):
cols.remove(TS_VALUE_COL)
logging.getLogger("prophet").setLevel(logging.WARNING)
model = Prophet(**self.params)
kwargs.pop("period")
for regressor in cols:
model.add_regressor(regressor)
with suppress_stdout_stderr():
model.fit(train_df)
model.fit(train_df, **kwargs)
train_time = time.time() - current_time
self._model = model
return train_time
Expand Down Expand Up @@ -1901,8 +1902,9 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs):
enforce_stationarity=False,
enforce_invertibility=False,
)
kwargs.pop("period")
with suppress_stdout_stderr():
model = model.fit()
model = model.fit(**kwargs)
train_time = time.time() - current_time
self._model = model
return train_time
Expand Down Expand Up @@ -2013,8 +2015,9 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs):
enforce_stationarity=False,
enforce_invertibility=False,
)
kwargs.pop("period")
with suppress_stdout_stderr():
model = model.fit()
model = model.fit(**kwargs)
train_time = time.time() - current_time
self._model = model
return train_time
Expand Down Expand Up @@ -2047,7 +2050,7 @@ def search_space(cls, data_size, pred_horizon, **params):

def __init__(self, task="ts_forecast", **params):
super().__init__(task, **params)
self.hcrystaball_model = None
self.hcrystalball_model = None
self.ts_task = (
"regression" if task in TS_FORECASTREGRESSION else "classification"
)
Expand All @@ -2072,32 +2075,33 @@ def _fit(self, X_train, y_train, budget=None, **kwargs):
lags = params.pop("lags")
optimize_for_horizon = params.pop("optimize_for_horizon")
estimator = self.base_class(task=self.ts_task, **params)
self.hcrystaball_model = get_sklearn_wrapper(estimator.estimator_class)
self.hcrystaball_model.lags = int(lags)
self.hcrystaball_model.fit(X_train, y_train)
self.hcrystalball_model = get_sklearn_wrapper(estimator.estimator_class)
self.hcrystalball_model.lags = int(lags)
period = kwargs.pop("period")
self.hcrystalball_model.fit(X_train, y_train, **kwargs)
if optimize_for_horizon:
# Direct Multi-step Forecast Strategy - fit a seperate model for each horizon
model_list = []
for i in range(1, kwargs["period"] + 1):
for i in range(1, period + 1):
(
X_fit,
y_fit,
) = self.hcrystaball_model._transform_data_to_tsmodel_input_format(
) = self.hcrystalball_model._transform_data_to_tsmodel_input_format(
X_train, y_train, i
)
self.hcrystaball_model.model.set_params(**estimator.params)
model = self.hcrystaball_model.model.fit(X_fit, y_fit)
self.hcrystalball_model.model.set_params(**estimator.params)
model = self.hcrystalball_model.model.fit(X_fit, y_fit, **kwargs)
model_list.append(model)
self._model = model_list
else:
(
X_fit,
y_fit,
) = self.hcrystaball_model._transform_data_to_tsmodel_input_format(
X_train, y_train, kwargs["period"]
) = self.hcrystalball_model._transform_data_to_tsmodel_input_format(
X_train, y_train, period
)
self.hcrystaball_model.model.set_params(**estimator.params)
model = self.hcrystaball_model.model.fit(X_fit, y_fit)
self.hcrystalball_model.model.set_params(**estimator.params)
model = self.hcrystalball_model.model.fit(X_fit, y_fit, **kwargs)
self._model = model

def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs):
Expand All @@ -2119,20 +2123,20 @@ def predict(self, X, **kwargs):
(
X_pred,
_,
) = self.hcrystaball_model._transform_data_to_tsmodel_input_format(
) = self.hcrystalball_model._transform_data_to_tsmodel_input_format(
X.iloc[:i, :]
)
preds.append(self._model[i - 1].predict(X_pred, **kwargs)[-1])
forecast = DataFrame(
data=np.asarray(preds).reshape(-1, 1),
columns=[self.hcrystaball_model.name],
columns=[self.hcrystalball_model.name],
index=X.index,
)
else:
(
X_pred,
_,
) = self.hcrystaball_model._transform_data_to_tsmodel_input_format(X)
) = self.hcrystalball_model._transform_data_to_tsmodel_input_format(X)
forecast = self._model.predict(X_pred, **kwargs)
return forecast
else:
Expand Down Expand Up @@ -2246,6 +2250,8 @@ def transform_ds(self, X_train, y_train, **kwargs):
variable_groups=kwargs.get(
"variable_groups", {}
), # group of categorical variables can be treated as one variable
constant_fill_strategy=kwargs.get("constant_fill_strategy", {}),
allow_missing_timesteps=True,
lags=kwargs.get("lags", {}),
target_normalizer=GroupNormalizer(
groups=kwargs["group_ids"], transformation="softplus"
Expand Down Expand Up @@ -2293,7 +2299,7 @@ def fit(self, X_train, y_train, budget=None, free_mem_ratio=0, **kwargs):
monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)

def _fit(log):
def _fit(log, **kwargs):
default_trainer_kwargs = dict(
gpus=kwargs.get("gpu_per_trial", [0])
if torch.cuda.is_available()
Expand Down Expand Up @@ -2323,6 +2329,7 @@ def _fit(log):
tft,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader,
**kwargs
)
return trainer

Expand All @@ -2336,7 +2343,13 @@ def _fit(log):
# except ValueError:
# issue with pytorch forecasting model log_prediction() function
# pytorch-forecasting issue #1145
trainer = _fit(log=False)
fit_keys = ["period", "gpu_per_trial", "group_ids", "freq", "log_dir", "max_epochs", "batch_size",
"static_categoricals", "static_reals", "time_varying_known_categoricals", "time_varying_known_reals",
"time_varying_unknown_reals", "time_varying_unknown_categoricals", "variable_groups",
"max_encoder_length", "min_encoder_length", "lags", "constant_fill_strategy"]
for key in fit_keys:
if key in kwargs: kwargs.pop(key)
trainer = _fit(log=False, **kwargs)
best_model_path = trainer.checkpoint_callback.best_model_path
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path)
train_time = time.time() - current_time
Expand Down
134 changes: 128 additions & 6 deletions test/automl/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,132 @@ def smape(y_pred, y_test):
print(automl.min_resource)


def test_forecast_panel_missing(budget=5):
data, special_days = get_stalliion_data()
time_horizon = 6 # predict six months
training_cutoff = data["time_idx"].max() - time_horizon
data["time_idx"] = data["time_idx"].astype("int")
ts_col = data.pop("date")
data.insert(0, "date", ts_col)
# FLAML assumes input is not sorted, but we sort here for comparison purposes with y_test
data = data.sort_values(["agency", "sku", "date"])
X_train = data[lambda x: x.time_idx <= training_cutoff]
print(X_train.head())
print("Before Drop", len(X_train))
np.random.seed(1)
drop_indices = np.random.choice(X_train.index, 10, replace=False) # randomly choose 10 rows of train data to remove
X_train = X_train.drop(drop_indices)
print("After Drop", len(X_train))
X_test = data[lambda x: x.time_idx > training_cutoff]
y_train = X_train.pop("volume")
y_test = X_test.pop("volume")
print(len(X_train))
print(len(y_train))
print(len(X_test))
print(len(y_test))
automl = AutoML()
settings = {
"time_budget": budget, # total running time in seconds
"metric": "mape", # primary metric
"task": "ts_forecast_panel", # task type
"log_file_name": "test/stallion_forecast_missing.log", # flaml log file
"eval_method": "holdout",
}
fit_kwargs_by_estimator = {
"tft": {
"max_encoder_length": 24,
"static_categoricals": ["agency", "sku"],
"static_reals": ["avg_population_2017", "avg_yearly_household_income_2017"],
"time_varying_known_categoricals": ["special_days", "month"],
"variable_groups": {
"special_days": special_days
}, # group of categorical variables can be treated as one variable
"time_varying_known_reals": [
"time_idx",
"price_regular",
"discount_in_percent",
],
"time_varying_unknown_categoricals": [],
"time_varying_unknown_reals": [
"y", # always need a 'y' column for the target column
"log_volume",
"industry_volume",
"soda_volume",
"avg_max_temp",
"avg_volume_by_agency",
"avg_volume_by_sku",
],
"batch_size": 256,
"max_epochs": 1,
"gpu_per_trial": -1,
"constant_fill_strategy": {
"discount_in_percent": 0.0,
"soda_volume": 0.0,
} # for the gaps, use these constants to fill in missing values
}
}
"""The main flaml automl API"""
automl.fit(
X_train=X_train,
y_train=y_train,
**settings,
period=time_horizon,
group_ids=["agency", "sku"],
fit_kwargs_by_estimator=fit_kwargs_by_estimator,
)
""" retrieve best config and best learner"""
print("Best ML leaner:", automl.best_estimator)
print("Best hyperparmeter config:", automl.best_config)
print(f"Best mape on validation data: {automl.best_loss}")
print(f"Training duration of best run: {automl.best_config_train_time}s")
print(automl.model.estimator)
""" pickle and save the automl object """
import pickle

with open("automl.pkl", "wb") as f:
pickle.dump(automl, f, pickle.HIGHEST_PROTOCOL)
""" compute predictions of testing dataset """
y_pred = automl.predict(X_test)
""" compute different metric values on testing dataset"""
from flaml.automl.ml import sklearn_metric_loss_score

print(y_test)
print(y_pred)
print("mape", "=", sklearn_metric_loss_score("mape", y_pred, y_test))

def smape(y_pred, y_test):
import numpy as np

y_test, y_pred = np.array(y_test), np.array(y_pred)
return round(
np.mean(np.abs(y_pred - y_test) / ((np.abs(y_pred) + np.abs(y_test)) / 2))
* 100,
2,
)

print("smape", "=", smape(y_pred, y_test))
from flaml.automl.data import get_output_from_log

(
time_history,
best_valid_loss_history,
valid_loss_history,
config_history,
metric_history,
) = get_output_from_log(filename=settings["log_file_name"], time_budget=budget)
for config in config_history:
print(config)
print(automl.resource_attr)
print(automl.max_resource)
print(automl.min_resource)


if __name__ == "__main__":
test_forecast_automl(60)
test_multivariate_forecast_num(5)
test_multivariate_forecast_cat(5)
test_numpy()
test_forecast_classification(5)
test_forecast_panel(5)
# test_forecast_automl(60)
# test_multivariate_forecast_num(5)
# test_multivariate_forecast_cat(5)
# test_numpy()
# test_forecast_classification(5)
# test_forecast_panel(5)
print("testing missing data")
test_forecast_panel_missing(5)