Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] pytorch forecasting adapter with Global Forecasting API #6228

Open
wants to merge 47 commits into
base: main
Choose a base branch
from

Conversation

Xinyu-Wu-0000
Copy link
Contributor

Reference Issues/PRs

Related: #4651, #4923

Main Topic

A pytorch forecasting adapter with Global Forecasting API and several algorithms for design validation.

Details

I'm developing a pytorch forecasting adapter with the Global Forecasting API. To ensure a well-designed implementation, I'd like to discuss some design aspects.

New Base Class for Minimal Impact

A new base class, GlobalBaseForecaster, has been added to minimize the impact on existing forecasters and simplify testing. As discussed in #4651, the plan is to manage the Global Forecasting API via tags only. However, a phased approach might be beneficial. If a tag-based approach is confirmed, we can merge GlobalBaseForecaster back into BaseForecaster after design validation.

Data Type Conversion Challenges

Data type conversion presents a challenge because PyTorch forecasting expects TimeSeriesDataSet as input. While TimeSeriesDataSet can be created from a pandas.DataFrame, it requires numerous parameters. Determining where to pass these parameters is a key question.

Placing them in fit would introduce inconsistency with the existing API. If we put them in __init__, it would be very counterintuitive to define how the data conversion works while initializing the algorithm.

A similar issue arises during trainer initialization. Currently, trainer_params: Dict[str, Any] is used within __init__ to obtain trainer initialization parameters. However, the API for passing these parameters to trainer.fit is yet to be designed.

To convert pytorch_forecasting.models.base_model.Prediction back to a pandas.DataFrame, a custom conversion method is required. Refer to the following issues for more information: jdb78/pytorch-forecasting#734, jdb78/pytorch-forecasting#177.

Train/Validation Strategy

Training a model in PyTorch forecasting necessitates passing both the training and validation datasets together to the training algorithm. This allows for monitoring training progress, adjusting the learning rate, saving the model, or even stopping training prematurely. This differs from the typical sktime approach where only the training data is passed to fit and the test data is used for validation after training. Any suggestions on how to best address this discrepancy?

@benHeid @fkiraly Thank you very much for the feedback on my GSoC proposal! Any suggestions on implementation details or the overall design would be greatly appreciated.

@Xinyu-Wu-0000 Xinyu-Wu-0000 changed the title [ENH] Global pytorch-forecasting [ENH] pytorch forecasting adapter with Global Forecasting API Mar 28, 2024
@benHeid
Copy link
Contributor

benHeid commented Mar 30, 2024

Just a general comment. I would propose to split this into multiple PRs. This would make it easier to review. I would propose a PR for the pytorch-forecasting adapter (first PR) and a second PR that introduces the global forecasting.

Copy link
Collaborator

@fkiraly fkiraly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just commenting - very interesting!

I suppoes a separate class will give us opportunity to develop this capability. Ultimately, we may decide to merge it into BaseForecaster, or not.

@Xinyu-Wu-0000
Copy link
Contributor Author

Just a general comment. I would propose to split this into multiple PRs. This would make it easier to review. I would propose a PR for the pytorch-forecasting adapter (first PR) and a second PR that introduces the global forecasting.

Yeah, splitting this into 2 PRs will make the workflow more clear. But I am taking pytorch-forecasting as a experiment to try the global forecasting api design, ultimately DL models will need gloabl forecastig api anyway, would it be more convenient to have a single PR?

@fkiraly
Copy link
Collaborator

fkiraly commented Mar 31, 2024

Good question, @Xinyu-Wu-0000 - as long as this is experimental, it's up tp you.

It's always good to have one or two examples working for new API development, so it makes sense ot have examples in the PR.

Although there might be substantial challenges in isolation coming from pytorch forecasting which do not have to do with the global forecasting extension (you already list many of the serious ones, e.g., loader, prediction object), so I wonder if there is a simpler example to develop around.

Either way, that's not a strict requirement, as long as you are working in an exploratory sense.

@fkiraly fkiraly added module:forecasting forecasting module: forecasting, incl probabilistic and hierarchical forecasting enhancement Adding new functionality labels Mar 31, 2024
@Xinyu-Wu-0000
Copy link
Contributor Author

so I wonder if there is a simpler example to develop around.

Maybe NeuralForecast could be a simpler example as it's already been interfaced and all models from NeuralForecast are capable of global forecasting, but several PRs are currently working on NeuralForecast. I choose pytorch-forecasting to minimize the impact on existing code base as extending global forecasting API will be a quite big change.

It's always good to have one or two examples working for new API development, so it makes sense ot have examples in the PR.

I just made it work for an example from pytorch-forecasting. It is the first tutorial in the document of pytorch-forecasting.
The test script for the example I use:
test_script.txt

By the way, are we going to have a release with partial global forecasting API support? Something like version 0.30, only NeuralForecast models and pytorch-forecasting models with global forecasting API.

@fkiraly
Copy link
Collaborator

fkiraly commented Apr 1, 2024

By the way, are we going to have a release with partial global forecasting API support?

Yes, I think that's a valid upgrade plan, e.g., release first only some forecasters, and then later merge base classes if everything is robust.

It could be 0.29.0 even in theory, because we're not impacting existing classes with your plan.

@yarnabrina
Copy link
Collaborator

I didn't get time to debug what's the issue with the above script, but here's a small example that works without any X variables. This is tested on Colab today morning with pytorch-forecasting=1.0.0, pytorch-lightning=2.2.4 and torch=2.2.1.

import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
    MultiLoss,
    QuantileLoss,
    TemporalFusionTransformer,
    TimeSeriesDataSet,
)

random_generator = numpy.random.default_rng(seed=0)

sample_data = pandas.DataFrame(
    {
        "endogenous_variable_1": random_generator.uniform(low=-10, high=10, size=100),
        "endogenous_variable_2": random_generator.uniform(low=-10, high=10, size=100),
        "series_identifier": numpy.repeat(numpy.arange(4), 25),
        "temporal_identifier": numpy.tile(numpy.arange(25), 4),
    }
)

sample_data

training_dataset = TimeSeriesDataSet(
    sample_data,
    "temporal_identifier",
    ["endogenous_variable_1", "endogenous_variable_2"],
    ["series_identifier"],
    max_encoder_length=20,
    min_encoder_length=5,
    min_prediction_idx=20,
    max_prediction_length=5,
    min_prediction_length=5,
    time_varying_unknown_reals=["endogenous_variable_1", "endogenous_variable_2"],
)

training_dataset.get_parameters()

validation_dataset = TimeSeriesDataSet.from_dataset(
    training_dataset, sample_data, stop_randomization=True, predict=True
)

validation_dataset.get_parameters()

training_data_loader = training_dataset.to_dataloader(train=True, batch_size=5, num_workers=0)

validation_data_loader = validation_dataset.to_dataloader(train=False, batch_size=5, num_workers=0)

forecaster = TemporalFusionTransformer.from_dataset(
    training_dataset,
    lstm_layers=2,
    output_size=5,
    loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
)

forecaster.hparams

pytorch_trainer = Trainer(accelerator="cpu", max_epochs=5, min_epochs=2)

pytorch_trainer.fit(
    forecaster, train_dataloaders=training_data_loader, val_dataloaders=validation_data_loader
)

@Xinyu-Wu-0000
Copy link
Contributor Author

Xinyu-Wu-0000 commented May 16, 2024

I changed a little of your @yarnabrina example. I added log_val_interval to the TFT model. I also increased the size of the dataset to 100000 rows and the max_epochs to 500.

If I remove the log_val_interval, it will pass but way too fast than normal, 500 epoch in less than 5s:

Epoch 499:   0%|                                                                              | 0/12800 [00:00<?, ?it/s, v_num=171]`Trainer.fit` stopped: `max_epochs=500` reached.
Epoch 499:   0%|                                                                              | 0/12800 [00:00<?, ?it/s, v_num=171]
FIT END
TIME:  4.214004278182983

If I add the log_val_interval, it will raise the same error as my script.

You can change the log_val_interval by troggle the log_val in the beginning of the edited example.

from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
    MultiLoss,
    QuantileLoss,
    TemporalFusionTransformer,
    TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping

# False = no log - pass, 500 epoch for less than 5s
# True = do log - fail
log_val = False

# False = no X is passed - fail if log_val
# True = X is passed - pass, 5m for just 1 epoch
X_data = False

random_generator = numpy.random.default_rng(seed=0)

sample_data = pandas.DataFrame(
    {
        "endogenous_variable_1": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
        "endogenous_variable_2": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
        "series_identifier": numpy.repeat(numpy.arange(4000), 25),
        "temporal_identifier": numpy.tile(numpy.arange(25), 4000),
    }
)

sample_data

training_dataset = TimeSeriesDataSet(
    sample_data,
    "temporal_identifier",
    (
        ["endogenous_variable_1", "endogenous_variable_2"]
        if not X_data
        else ["endogenous_variable_2"]
    ),
    ["series_identifier"],
    max_encoder_length=20,
    min_encoder_length=5,
    min_prediction_idx=20,
    max_prediction_length=5,
    min_prediction_length=5,
    time_varying_unknown_reals=(
        ["endogenous_variable_1", "endogenous_variable_2"] if not X_data else []
    ),
    time_varying_known_reals=[] if not X_data else ["endogenous_variable_1"],
)

training_dataset.get_parameters()

validation_dataset = TimeSeriesDataSet.from_dataset(
    training_dataset, sample_data, stop_randomization=True, predict=True
)

validation_dataset.get_parameters()

training_data_loader = training_dataset.to_dataloader(
    train=True, batch_size=5, num_workers=0
)

validation_data_loader = validation_dataset.to_dataloader(
    train=False, batch_size=5, num_workers=0
)

forecaster = TemporalFusionTransformer.from_dataset(
    training_dataset,
    lstm_layers=2,
    output_size=5,
    loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
    log_val_interval=10 if log_val else -1,
)

forecaster.hparams
forecaster_copy = deepcopy(forecaster)

early_stop_callback = EarlyStopping(
    monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
    accelerator="cpu",
    max_epochs=500 if not X_data else 1,
    min_epochs=2,
    # callbacks=[early_stop_callback] if log_val else [],
)

start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
    forecaster,
    train_dataloaders=training_data_loader,
    val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)

for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
    print(p - p_c)

@fkiraly
Copy link
Collaborator

fkiraly commented May 16, 2024

So would a solution be to force log_val_internal to False if X is not passed?

What is that parameter doing, anyway?

@Xinyu-Wu-0000
Copy link
Contributor Author

Xinyu-Wu-0000 commented May 17, 2024

What is that parameter doing

From the document of pytorch-forecasting:

  • log_val_interval – frequency with which to log validation set metrics, defaults to log_interval

Logging validation set metrics is the prerequisite of early stop callback on validation loss.

So would a solution be to force log_val_internal to False if X is not passed?

It won't raise any error, but I am highly suspicious that it does nothing since it runs 500 epoch in less than 5s in my computer. I added some code to prove it. You can see that no parameters actully changes after fitting. If I set X_data = True, It took 5m to run 1 epoch, and the parameters changed.

I don't know why but if we force log_val_internal to False when no X passed, it gives no error just because it does nothing.

@fkiraly
Copy link
Collaborator

fkiraly commented May 17, 2024

as discussed right now after the standup, FYI @benHeid, recommendations for @Xinyu-Wu-0000

  • can you write your manual script in a way that at the start you have X and y in the format you would see inside _fit. That is, MultiIndex pandas.
  • keep the X_data flag, but it should control whether X is set to None at the start. Do not use it in invoking pytorch-forecasting.
  • If then you case X_data=False does not wok, compare to @yarnabrina's working code and use it to get your code working.

@Xinyu-Wu-0000
Copy link
Contributor Author

I have edited the script.

  • If then you case X_data=False does not wok, compare to @yarnabrina's working code and use it to get your code working.

@yarnabrina's code is not really working. If X_data and log_val are set to False, then it's @yarnabrina's code. It does not raise error, but it does not fit the data too. The parameters will be printed at the end of the script, showing no changes after fitting.

If log_val are set to True and X_data is False, it fails.

Only if X_data = True, then it is really fitting the data.

Script starts with X, y in MultiIndex pandas frame format:

from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
    MultiLoss,
    QuantileLoss,
    TemporalFusionTransformer,
    TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping


# False = no log - pass, 500 epoch for less than 5s
# True = do log - fail
log_val = False

# False = no X is passed - fail if log_val
# True = X is passed - pass, 5m for just 1 epoch
X_data = False

random_generator = numpy.random.default_rng(seed=0)

multi_index = pandas.MultiIndex.from_frame(pandas.DataFrame(
    {
        "series_identifier": numpy.repeat(numpy.arange(4000), 25),
        "temporal_identifier": numpy.tile(numpy.arange(25), 4000),
    }
))
X = pandas.DataFrame(
    {
        "exogenous_variable_1": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
    }
).set_index(multi_index) if X_data else None
y = pandas.DataFrame(
    {
        "endogenous_variable_1": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
        "endogenous_variable_2": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
    }
).set_index(multi_index)


if X is not None:
    sample_data = X.join(y, on=["series_identifier", "temporal_identifier"])
else:
    sample_data = deepcopy(y)
sample_data.reset_index(level=[0, 1], inplace=True)
print(sample_data.columns)

training_dataset = TimeSeriesDataSet(
    data=sample_data,
    time_idx="temporal_identifier",
    target=(
        ["endogenous_variable_1", "endogenous_variable_2"]
    ),
    group_ids=["series_identifier"],
    max_encoder_length=20,
    min_encoder_length=5,
    min_prediction_idx=20,
    max_prediction_length=5,
    min_prediction_length=5,
    time_varying_unknown_reals=(
        ["endogenous_variable_1", "endogenous_variable_2"]
    ),
    time_varying_known_reals=[] if X is None else ["exogenous_variable_1"],
)

training_dataset.get_parameters()

validation_dataset = TimeSeriesDataSet.from_dataset(
    training_dataset, sample_data, stop_randomization=True, predict=True
)

validation_dataset.get_parameters()

training_data_loader = training_dataset.to_dataloader(
    train=True, batch_size=5, num_workers=0
)

validation_data_loader = validation_dataset.to_dataloader(
    train=False, batch_size=5, num_workers=0
)

forecaster = TemporalFusionTransformer.from_dataset(
    training_dataset,
    lstm_layers=2,
    output_size=5,
    loss=MultiLoss([QuantileLoss(quantiles=[0.025, 0.05, 0.5, 0.95, 0.975])]),
    log_val_interval=10 if log_val else -1,
)

forecaster.hparams
forecaster_copy = deepcopy(forecaster)

early_stop_callback = EarlyStopping(
    monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
    accelerator="cpu",
    max_epochs=500 if not X_data else 1,
    min_epochs=2,
    # callbacks=[early_stop_callback] if log_val else [],
)

start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
    forecaster,
    train_dataloaders=training_data_loader,
    val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)

for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
    print((p - p_c).abs().sum())

@yarnabrina
Copy link
Collaborator

@Xinyu-Wu-0000 can you please explain why do you think my code is not working? Are the parameters supposed to change before and after training, or the weights?

@Xinyu-Wu-0000
Copy link
Contributor Author

Xinyu-Wu-0000 commented May 17, 2024

@yarnabrina, Yes they are supposed to change, the parameters are the weights of the pytorch model.

for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
    print((p - p_c).abs().sum())

If you set X_data = True, they will change.

@Xinyu-Wu-0000
Copy link
Contributor Author

Xinyu-Wu-0000 commented May 17, 2024

As discussed on the meet-up on 17th May, I edited the script to test

  • univariate target
  • NBeats or NHiTS or TemporalFusionTransformer
  • log_val_interval or not
  • X_data or not

Test result:

model log_val_interval X_data Error weights changed?
Nbeats yes no no yes
NHiTS yes no no yes
TFT yes no yes -
TFT no yes no no
TFT yes yes no yes
Nbeats yes yes yes -
NHiTS yes yes no yes

Test script:

from copy import deepcopy
import time
import numpy
import pandas
from lightning.pytorch import Trainer
from pytorch_forecasting import (
    MultiLoss,
    QuantileLoss,
    TemporalFusionTransformer,
    NBeats,
    NHiTS,
    TimeSeriesDataSet,
)
from lightning.pytorch.callbacks import EarlyStopping

# which model to use
# model_class = TemporalFusionTransformer
# model_class = NBeats
model_class = NHiTS

# False = no log - pass, 500 epoch for less than 5s (TFT)
# True = do log - fail (TFT)
log_val = True

# False = no X is passed - fail if log_val (TFT)
# True = X is passed - pass, 5m for just 1 epoch (TFT)
X_data = True


random_generator = numpy.random.default_rng(seed=0)

multi_index = pandas.MultiIndex.from_frame(
    pandas.DataFrame(
        {
            "series_identifier": numpy.repeat(numpy.arange(4000), 25),
            "temporal_identifier": numpy.tile(numpy.arange(25), 4000),
        }
    )
)
X = (
    pandas.DataFrame(
        {
            "exogenous_variable_1": random_generator.uniform(
                low=-10, high=10, size=100000
            ),
        }
    ).set_index(multi_index)
    if X_data
    else None
)
y = pandas.DataFrame(
    {
        "endogenous_variable_1": random_generator.uniform(
            low=-10, high=10, size=100000
        ),
    }
).set_index(multi_index)


if X is not None:
    sample_data = X.join(y, on=["series_identifier", "temporal_identifier"])
else:
    sample_data = deepcopy(y)
sample_data.reset_index(level=[0, 1], inplace=True)
print(sample_data.columns)

training_dataset = TimeSeriesDataSet(
    data=sample_data,
    time_idx="temporal_identifier",
    target="endogenous_variable_1",
    group_ids=["series_identifier"],
    max_encoder_length=20,
    min_encoder_length=20,
    min_prediction_idx=20,
    max_prediction_length=5,
    min_prediction_length=5,
    time_varying_unknown_reals=["endogenous_variable_1"],
    time_varying_known_reals=[] if X is None else ["exogenous_variable_1"],
)

training_dataset.get_parameters()

validation_dataset = TimeSeriesDataSet.from_dataset(
    training_dataset, sample_data, stop_randomization=True, predict=True
)

validation_dataset.get_parameters()

training_data_loader = training_dataset.to_dataloader(
    train=True, batch_size=5, num_workers=0
)

validation_data_loader = validation_dataset.to_dataloader(
    train=False, batch_size=5, num_workers=0
)

forecaster = model_class.from_dataset(
    training_dataset,
    log_val_interval=10 if log_val else -1,
)

forecaster.hparams
forecaster_copy = deepcopy(forecaster)

early_stop_callback = EarlyStopping(
    monitor="val_loss", min_delta=1e-4, patience=10, verbose=False, mode="min"
)
pytorch_trainer = Trainer(
    accelerator="cpu",
    max_epochs=500 if (not X_data and model_class is TemporalFusionTransformer) else 1,
    min_epochs=2,
    # callbacks=[early_stop_callback] if log_val else [],
)

start_time = time.time()
print("FIT START")
pytorch_trainer.fit(
    forecaster,
    train_dataloaders=training_data_loader,
    val_dataloaders=validation_data_loader,
)
print("FIT END")
print("TIME: ", time.time() - start_time)

for p, p_c in zip(forecaster.parameters(), forecaster_copy.parameters()):
    print((p - p_c).abs().sum())

@yarnabrina
Copy link
Collaborator

yarnabrina commented May 18, 2024

Test result:
model log_val_interval X_data Error weights changed?
Nbeats yes no no yes
NHiTS yes no no yes
TFT yes no yes -
TFT no yes no no
TFT yes yes no yes
Nbeats yes yes yes -
NHiTS yes yes no yes

So, Nbeats fail if X is passed, and TFT fails if X is not passed? That's weird.

But I don't understand why logging validation metrics will affect weight updates. Validation data shouldn't affect training at all (except monitoring through callbacks etc., but not weight updates), does that make any sense to you @benHeid @fkiraly?

@benHeid
Copy link
Contributor

benHeid commented May 19, 2024

These errors are probably raised because the algorithm does not support that version. At least the original variant of NBeats is not supporting exogenous features. Only NBeatsX is supporting them. Regarding TFT, after shortly screening the original paper, it seems that this model is always taking exogenous features as input.

Since this behavior is not detectable programmatically, I would propose that we add a dict that is describing valid combinations and raising an error otherwise.

@Xinyu-Wu-0000
Copy link
Contributor Author

requires_X tag is defined only for transformers.

As @yarnabrina mensioned, requires_X is only for transformers now, can we extend it for forecasters?

_tags = {
"tag_name": "ignores-exogeneous-X",
"parent_type": "forecaster",
"tag_type": "bool",
"short_descr": "does forecaster make use of exogeneous data?",
"user_facing": True,
}

With requires_X tag and ignores-exogeneous-X tag, we can raise an error according to tags and input.

@benHeid
Copy link
Contributor

benHeid commented May 20, 2024

Yes in that case a require_x flag make probably sense.

@Xinyu-Wu-0000
Copy link
Contributor Author

Xinyu-Wu-0000 commented May 20, 2024

As discussed in the mentor meeting and standups, there are two questions:

  • How would tests with the index assumption for global forecasters be considered.
    An example could be:

      fit:
      x: time index 0~100
      y: time index 0~100
    
      predict:
      x: time index 0~103
      y: time index 0~100
    
      output:
      y: time index 101~103
    

    Probably with multiindex or multiindex_hier levels.
    Probably x, y for predict are from different instances than x, y for fit.

  • How would the tag requires_X be considered in the test scenario generation.

    Currently the TFT forecaster in pytorch-forecasting can't fit without X. Therefor we need to make sure the tests in CI pass X to forecasters with tag requires_X. As @yarnabrina mensioned, requires_X is only for transformers now, so we need to extend it for forecasters.

fix X is None (Nbeats model can not handle X)
fix index names contain None
fix column names are not str type
fix just one timeseries
fix y type is pd.series
@fkiraly
Copy link
Collaborator

fkiraly commented May 22, 2024

To summarize discussion from discord: it would entail a serious API change if we allow a case distinction where X is mandatory - with ripple effects through API test suite etc.

For the same reason, YfromX - an estimator that also relies on X for the forecast - falls back on a dummy mode if no X is present. Imo we should deal with TFT in a similar way, e.g., replace a missing X by a constant dummy or similar.

@Xinyu-Wu-0000
Copy link
Contributor Author

Missing X is replaced by a constant dummy for TFT now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Adding new functionality module:forecasting forecasting module: forecasting, incl probabilistic and hierarchical forecasting
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants