New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ENH] pytorch forecasting adapter with Global Forecasting API #6228
base: main
Are you sure you want to change the base?
[ENH] pytorch forecasting adapter with Global Forecasting API #6228
Conversation
Just a general comment. I would propose to split this into multiple PRs. This would make it easier to review. I would propose a PR for the pytorch-forecasting adapter (first PR) and a second PR that introduces the global forecasting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just commenting - very interesting!
I suppoes a separate class will give us opportunity to develop this capability. Ultimately, we may decide to merge it into BaseForecaster
, or not.
Yeah, splitting this into 2 PRs will make the workflow more clear. But I am taking pytorch-forecasting as a experiment to try the global forecasting api design, ultimately DL models will need gloabl forecastig api anyway, would it be more convenient to have a single PR? |
Good question, @Xinyu-Wu-0000 - as long as this is experimental, it's up tp you. It's always good to have one or two examples working for new API development, so it makes sense ot have examples in the PR. Although there might be substantial challenges in isolation coming from pytorch forecasting which do not have to do with the global forecasting extension (you already list many of the serious ones, e.g., loader, prediction object), so I wonder if there is a simpler example to develop around. Either way, that's not a strict requirement, as long as you are working in an exploratory sense. |
Maybe NeuralForecast could be a simpler example as it's already been interfaced and all models from NeuralForecast are capable of global forecasting, but several PRs are currently working on NeuralForecast. I choose pytorch-forecasting to minimize the impact on existing code base as extending global forecasting API will be a quite big change.
I just made it work for an example from pytorch-forecasting. It is the first tutorial in the document of pytorch-forecasting. By the way, are we going to have a release with partial global forecasting API support? Something like version 0.30, only NeuralForecast models and pytorch-forecasting models with global forecasting API. |
Yes, I think that's a valid upgrade plan, e.g., release first only some forecasters, and then later merge base classes if everything is robust. It could be 0.29.0 even in theory, because we're not impacting existing classes with your plan. |
I didn't get time to debug what's the issue with the above script, but here's a small example that works without any X variables. This is tested on Colab today morning with import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
MultiLoss,
QuantileLoss,
TemporalFusionTransformer,
TimeSeriesDataSet,
)
random_generator = numpy.random.default_rng(seed=0)
sample_data = pandas.DataFrame(
{
"endogenous_variable_1": random_generator.uniform(low=-10, high=10, size=100),
"endogenous_variable_2": random_generator.uniform(low=-10, high=10, size=100),
"series_identifier": numpy.repeat(numpy.arange(4), 25),
"temporal_identifier": numpy.tile(numpy.arange(25), 4),
}
)
sample_data
training_dataset = TimeSeriesDataSet(
sample_data,
"temporal_identifier",
["endogenous_variable_1", "endogenous_variable_2"],
["series_identifier"],
max_encoder_length=20,
min_encoder_length=5,
min_prediction_idx=20,
max_prediction_length=5,
min_prediction_length=5,
time_varying_unknown_reals=["endogenous_variable_1", "endogenous_variable_2"],
)
training_dataset.get_parameters()
validation_dataset = TimeSeriesDataSet.from_dataset(
training_dataset, sample_data, stop_randomization=True, predict=True
)
validation_dataset.get_parameters()
training_data_loader = training_dataset.to_dataloader(train=True, batch_size=5, num_workers=0)
validation_data_loader = validation_dataset.to_dataloader(train=False, batch_size=5, num_workers=0)
forecaster = TemporalFusionTransformer.from_dataset(
training_dataset,
lstm_layers=2,
output_size=5,
loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
)
forecaster.hparams
pytorch_trainer = Trainer(accelerator="cpu", max_epochs=5, min_epochs=2)
pytorch_trainer.fit(
forecaster, train_dataloaders=training_data_loader, val_dataloaders=validation_data_loader
) |
I changed a little of your @yarnabrina example. I added If I remove the Epoch 499: 0%| | 0/12800 [00:00<?, ?it/s, v_num=171]`Trainer.fit` stopped: `max_epochs=500` reached.
Epoch 499: 0%| | 0/12800 [00:00<?, ?it/s, v_num=171]
FIT END
TIME: 4.214004278182983 If I add the You can change the from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
MultiLoss,
QuantileLoss,
TemporalFusionTransformer,
TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping
# False = no log - pass, 500 epoch for less than 5s
# True = do log - fail
log_val = False
# False = no X is passed - fail if log_val
# True = X is passed - pass, 5m for just 1 epoch
X_data = False
random_generator = numpy.random.default_rng(seed=0)
sample_data = pandas.DataFrame(
{
"endogenous_variable_1": random_generator.uniform(
low=-10, high=10, size=100000
),
"endogenous_variable_2": random_generator.uniform(
low=-10, high=10, size=100000
),
"series_identifier": numpy.repeat(numpy.arange(4000), 25),
"temporal_identifier": numpy.tile(numpy.arange(25), 4000),
}
)
sample_data
training_dataset = TimeSeriesDataSet(
sample_data,
"temporal_identifier",
(
["endogenous_variable_1", "endogenous_variable_2"]
if not X_data
else ["endogenous_variable_2"]
),
["series_identifier"],
max_encoder_length=20,
min_encoder_length=5,
min_prediction_idx=20,
max_prediction_length=5,
min_prediction_length=5,
time_varying_unknown_reals=(
["endogenous_variable_1", "endogenous_variable_2"] if not X_data else []
),
time_varying_known_reals=[] if not X_data else ["endogenous_variable_1"],
)
training_dataset.get_parameters()
validation_dataset = TimeSeriesDataSet.from_dataset(
training_dataset, sample_data, stop_randomization=True, predict=True
)
validation_dataset.get_parameters()
training_data_loader = training_dataset.to_dataloader(
train=True, batch_size=5, num_workers=0
)
validation_data_loader = validation_dataset.to_dataloader(
train=False, batch_size=5, num_workers=0
)
forecaster = TemporalFusionTransformer.from_dataset(
training_dataset,
lstm_layers=2,
output_size=5,
loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
log_val_interval=10 if log_val else -1,
)
forecaster.hparams
forecaster_copy = deepcopy(forecaster)
early_stop_callback = EarlyStopping(
monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
accelerator="cpu",
max_epochs=500 if not X_data else 1,
min_epochs=2,
# callbacks=[early_stop_callback] if log_val else [],
)
start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
forecaster,
train_dataloaders=training_data_loader,
val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)
for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
print(p - p_c) |
So would a solution be to force What is that parameter doing, anyway? |
From the document of pytorch-forecasting:
Logging validation set metrics is the prerequisite of early stop callback on validation loss.
It won't raise any error, but I am highly suspicious that it does nothing since it runs 500 epoch in less than 5s in my computer. I added some code to prove it. You can see that no parameters actully changes after fitting. If I set I don't know why but if we force |
as discussed right now after the standup, FYI @benHeid, recommendations for @Xinyu-Wu-0000
|
I have edited the script.
@yarnabrina's code is not really working. If If Only if Script starts with from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
MultiLoss,
QuantileLoss,
TemporalFusionTransformer,
TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping
# False = no log - pass, 500 epoch for less than 5s
# True = do log - fail
log_val = False
# False = no X is passed - fail if log_val
# True = X is passed - pass, 5m for just 1 epoch
X_data = False
random_generator = numpy.random.default_rng(seed=0)
multi_index = pandas.MultiIndex.from_frame(pandas.DataFrame(
{
"series_identifier": numpy.repeat(numpy.arange(4000), 25),
"temporal_identifier": numpy.tile(numpy.arange(25), 4000),
}
))
X = pandas.DataFrame(
{
"exogenous_variable_1": random_generator.uniform(
low=-10, high=10, size=100000
),
}
).set_index(multi_index) if X_data else None
y = pandas.DataFrame(
{
"endogenous_variable_1": random_generator.uniform(
low=-10, high=10, size=100000
),
"endogenous_variable_2": random_generator.uniform(
low=-10, high=10, size=100000
),
}
).set_index(multi_index)
if X is not None:
sample_data = X.join(y, on=["series_identifier", "temporal_identifier"])
else:
sample_data = deepcopy(y)
sample_data.reset_index(level=[0, 1], inplace=True)
print(sample_data.columns)
training_dataset = TimeSeriesDataSet(
data=sample_data,
time_idx="temporal_identifier",
target=(
["endogenous_variable_1", "endogenous_variable_2"]
),
group_ids=["series_identifier"],
max_encoder_length=20,
min_encoder_length=5,
min_prediction_idx=20,
max_prediction_length=5,
min_prediction_length=5,
time_varying_unknown_reals=(
["endogenous_variable_1", "endogenous_variable_2"]
),
time_varying_known_reals=[] if X is None else ["exogenous_variable_1"],
)
training_dataset.get_parameters()
validation_dataset = TimeSeriesDataSet.from_dataset(
training_dataset, sample_data, stop_randomization=True, predict=True
)
validation_dataset.get_parameters()
training_data_loader = training_dataset.to_dataloader(
train=True, batch_size=5, num_workers=0
)
validation_data_loader = validation_dataset.to_dataloader(
train=False, batch_size=5, num_workers=0
)
forecaster = TemporalFusionTransformer.from_dataset(
training_dataset,
lstm_layers=2,
output_size=5,
loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
log_val_interval=10 if log_val else -1,
)
forecaster.hparams
forecaster_copy = deepcopy(forecaster)
early_stop_callback = EarlyStopping(
monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
accelerator="cpu",
max_epochs=500 if not X_data else 1,
min_epochs=2,
# callbacks=[early_stop_callback] if log_val else [],
)
start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
forecaster,
train_dataloaders=training_data_loader,
val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)
for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
print((p - p_c).abs().sum()) |
@Xinyu-Wu-0000 can you please explain why do you think my code is not working? Are the parameters supposed to change before and after training, or the weights? |
@yarnabrina, Yes they are supposed to change, the parameters are the weights of the pytorch model.
If you set |
99fd463
to
d96e54e
Compare
As discussed on the meet-up on 17th May, I edited the script to test
Test result:
Test script: from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
MultiLoss,
QuantileLoss,
TemporalFusionTransformer,
NBeats,
NHiTS,
TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping
# which model to use
# model_class = TemporalFusionTransformer
# model_class = NBeats
model_class = NHiTS
# False = no log - pass, 500 epoch for less than 5s (TFT)
# True = do log - fail (TFT)
log_val = True
# False = no X is passed - fail if log_val (TFT)
# True = X is passed - pass, 5m for just 1 epoch (TFT)
X_data = True
random_generator = numpy.random.default_rng(seed=0)
multi_index = pandas.MultiIndex.from_frame(
pandas.DataFrame(
{
"series_identifier": numpy.repeat(numpy.arange(4000), 25),
"temporal_identifier": numpy.tile(numpy.arange(25), 4000),
}
)
)
X = (
pandas.DataFrame(
{
"exogenous_variable_1": random_generator.uniform(
low=-10, high=10, size=100000
),
}
).set_index(multi_index)
if X_data
else None
)
y = pandas.DataFrame(
{
"endogenous_variable_1": random_generator.uniform(
low=-10, high=10, size=100000
),
}
).set_index(multi_index)
if X is not None:
sample_data = X.join(y, on=["series_identifier", "temporal_identifier"])
else:
sample_data = deepcopy(y)
sample_data.reset_index(level=[0, 1], inplace=True)
print(sample_data.columns)
training_dataset = TimeSeriesDataSet(
data=sample_data,
time_idx="temporal_identifier",
target="endogenous_variable_1",
group_ids=["series_identifier"],
max_encoder_length=20,
min_encoder_length=20,
min_prediction_idx=20,
max_prediction_length=5,
min_prediction_length=5,
time_varying_unknown_reals=["endogenous_variable_1"],
time_varying_known_reals=[] if X is None else ["exogenous_variable_1"],
)
training_dataset.get_parameters()
validation_dataset = TimeSeriesDataSet.from_dataset(
training_dataset, sample_data, stop_randomization=True, predict=True
)
validation_dataset.get_parameters()
training_data_loader = training_dataset.to_dataloader(
train=True, batch_size=5, num_workers=0
)
validation_data_loader = validation_dataset.to_dataloader(
train=False, batch_size=5, num_workers=0
)
forecaster = model_class.from_dataset(
training_dataset,
log_val_interval=10 if log_val else -1,
)
forecaster.hparams
forecaster_copy = deepcopy(forecaster)
early_stop_callback = EarlyStopping(
monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
accelerator="cpu",
max_epochs=500 if (not X_data and model_class is TemporalFusionTransformer) else 1,
min_epochs=2,
# callbacks=[early_stop_callback] if log_val else [],
)
start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
forecaster,
train_dataloaders=training_data_loader,
val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)
for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
print((p - p_c).abs().sum()) |
So, Nbeats fail if X is passed, and TFT fails if X is not passed? That's weird. But I don't understand why logging validation metrics will affect weight updates. Validation data shouldn't affect training at all (except monitoring through callbacks etc., but not weight updates), does that make any sense to you @benHeid @fkiraly? |
These errors are probably raised because the algorithm does not support that version. At least the original variant of NBeats is not supporting exogenous features. Only NBeatsX is supporting them. Regarding TFT, after shortly screening the original paper, it seems that this model is always taking exogenous features as input. Since this behavior is not detectable programmatically, I would propose that we add a dict that is describing valid combinations and raising an error otherwise. |
As @yarnabrina mensioned, sktime/sktime/registry/_tags.py Lines 601 to 607 in 2387e12
With |
Yes in that case a |
As discussed in the mentor meeting and standups, there are two questions:
|
fix X is None (Nbeats model can not handle X) fix index names contain None fix column names are not str type fix just one timeseries fix y type is pd.series
solve conficts in pyproject.toml
To summarize discussion from discord: it would entail a serious API change if we allow a case distinction where For the same reason, |
Missing X is replaced by a constant dummy for TFT now. |
Reference Issues/PRs
Related: #4651, #4923
Main Topic
A pytorch forecasting adapter with Global Forecasting API and several algorithms for design validation.
Details
I'm developing a pytorch forecasting adapter with the Global Forecasting API. To ensure a well-designed implementation, I'd like to discuss some design aspects.
New Base Class for Minimal Impact
A new base class,
GlobalBaseForecaster
, has been added to minimize the impact on existing forecasters and simplify testing. As discussed in #4651, the plan is to manage the Global Forecasting API via tags only. However, a phased approach might be beneficial. If a tag-based approach is confirmed, we can mergeGlobalBaseForecaster
back intoBaseForecaster
after design validation.Data Type Conversion Challenges
Data type conversion presents a challenge because PyTorch forecasting expects TimeSeriesDataSet as input. While
TimeSeriesDataSet
can be created from apandas.DataFrame
, it requires numerous parameters. Determining where to pass these parameters is a key question.Placing them in
fit
would introduce inconsistency with the existing API. If we put them in__init__
, it would be very counterintuitive to define how the data conversion works while initializing the algorithm.A similar issue arises during trainer initialization. Currently,
trainer_params: Dict[str, Any]
is used within__init__
to obtain trainer initialization parameters. However, the API for passing these parameters totrainer.fit
is yet to be designed.To convert
pytorch_forecasting.models.base_model.Prediction
back to apandas.DataFrame
, a custom conversion method is required. Refer to the following issues for more information: jdb78/pytorch-forecasting#734, jdb78/pytorch-forecasting#177.Train/Validation Strategy
Training a model in PyTorch forecasting necessitates passing both the training and validation datasets together to the training algorithm. This allows for monitoring training progress, adjusting the learning rate, saving the model, or even stopping training prematurely. This differs from the typical sktime approach where only the training data is passed to fit and the test data is used for validation after training. Any suggestions on how to best address this discrepancy?
@benHeid @fkiraly Thank you very much for the feedback on my GSoC proposal! Any suggestions on implementation details or the overall design would be greatly appreciated.