mirror of
https://github.com/amazon-science/chronos-forecasting
synced 2026-05-22 09:09:44 +00:00
*Issue #, if available:* *Description of changes:* By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
346 lines
11 KiB
Python
346 lines
11 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from typing import Iterable, Optional
|
|
|
|
import datasets
|
|
import numpy as np
|
|
import pandas as pd
|
|
import torch
|
|
import typer
|
|
import yaml
|
|
from gluonts.dataset.split import split
|
|
from gluonts.ev.metrics import MASE, MeanWeightedSumQuantileLoss
|
|
from gluonts.itertools import batcher
|
|
from gluonts.model.evaluation import evaluate_forecasts
|
|
from gluonts.model.forecast import QuantileForecast, SampleForecast
|
|
from tqdm.auto import tqdm
|
|
|
|
from chronos import BaseChronosPipeline, Chronos2Pipeline, ChronosBoltPipeline, ChronosPipeline, ForecastType
|
|
|
|
app = typer.Typer(pretty_exceptions_enable=False)
|
|
|
|
QUANTILES = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
|
|
|
|
|
|
def to_gluonts_univariate(hf_dataset: datasets.Dataset):
|
|
series_fields = [col for col in hf_dataset.features if isinstance(hf_dataset.features[col], datasets.Sequence)]
|
|
series_fields.remove("timestamp")
|
|
dataset_length = hf_dataset.info.splits["train"].num_examples * len(series_fields)
|
|
|
|
# Assumes that all time series in the dataset have the same frequency
|
|
dataset_freq = pd.DatetimeIndex(hf_dataset[0]["timestamp"]).to_period()[0].freqstr
|
|
|
|
gts_dataset = []
|
|
for hf_entry in hf_dataset:
|
|
for field in series_fields:
|
|
gts_dataset.append(
|
|
{
|
|
"start": pd.Period(
|
|
hf_entry["timestamp"][0],
|
|
freq=dataset_freq,
|
|
),
|
|
"target": hf_entry[field],
|
|
}
|
|
)
|
|
assert len(gts_dataset) == dataset_length
|
|
|
|
return gts_dataset
|
|
|
|
|
|
def load_and_split_dataset(backtest_config: dict):
|
|
hf_repo = backtest_config["hf_repo"]
|
|
dataset_name = backtest_config["name"]
|
|
offset = backtest_config["offset"]
|
|
prediction_length = backtest_config["prediction_length"]
|
|
num_rolls = backtest_config["num_rolls"]
|
|
|
|
# This is needed because the datasets in autogluon/chronos_datasets_extra cannot
|
|
# be distribued due to license restrictions and must be generated on the fly
|
|
trust_remote_code = True if hf_repo == "autogluon/chronos_datasets_extra" else False
|
|
|
|
ds = datasets.load_dataset(hf_repo, dataset_name, split="train", trust_remote_code=trust_remote_code)
|
|
ds.set_format("numpy")
|
|
|
|
gts_dataset = to_gluonts_univariate(ds)
|
|
|
|
# Split dataset for evaluation
|
|
_, test_template = split(gts_dataset, offset=offset)
|
|
test_data = test_template.generate_instances(prediction_length, windows=num_rolls)
|
|
|
|
return test_data
|
|
|
|
|
|
def generate_forecasts(
|
|
test_data_input: Iterable,
|
|
pipeline: BaseChronosPipeline,
|
|
prediction_length: int,
|
|
batch_size: int,
|
|
**predict_kwargs,
|
|
):
|
|
# Generate forecasts
|
|
forecast_outputs = []
|
|
for batch in tqdm(batcher(test_data_input, batch_size=batch_size)):
|
|
context = [torch.tensor(entry["target"]) for entry in batch]
|
|
quantiles, _ = pipeline.predict_quantiles(
|
|
context,
|
|
prediction_length=prediction_length,
|
|
quantile_levels=QUANTILES,
|
|
**predict_kwargs,
|
|
)
|
|
if isinstance(quantiles, list):
|
|
# This is needed for Chronos-2 support which returns a list of tensors
|
|
quantiles = np.stack(quantiles).squeeze(axis=1)
|
|
quantiles = quantiles.swapaxes(-1, -2)
|
|
forecast_outputs.append(quantiles)
|
|
forecast_outputs = np.concatenate(forecast_outputs)
|
|
|
|
# Convert forecast samples into gluonts Forecast objects
|
|
forecasts = []
|
|
for item, ts in zip(forecast_outputs, test_data_input):
|
|
forecast_start_date = ts["start"] + len(ts["target"])
|
|
|
|
if pipeline.forecast_type == ForecastType.SAMPLES:
|
|
forecasts.append(SampleForecast(samples=item, start_date=forecast_start_date))
|
|
elif pipeline.forecast_type == ForecastType.QUANTILES:
|
|
forecasts.append(
|
|
QuantileForecast(
|
|
forecast_arrays=item,
|
|
forecast_keys=list(map(str, QUANTILES)),
|
|
start_date=forecast_start_date,
|
|
)
|
|
)
|
|
|
|
return forecasts
|
|
|
|
|
|
def eval_pipeline_and_save_results(
|
|
pipeline: BaseChronosPipeline,
|
|
config_path: Path,
|
|
metrics_path: Path,
|
|
model_id: str,
|
|
batch_size: int,
|
|
**predict_kwargs,
|
|
):
|
|
# Load backtest configs
|
|
with open(config_path) as fp:
|
|
backtest_configs = yaml.safe_load(fp)
|
|
|
|
result_rows = []
|
|
for config in backtest_configs:
|
|
dataset_name = config["name"]
|
|
prediction_length = config["prediction_length"]
|
|
|
|
logger.info(f"Loading {dataset_name}")
|
|
test_data = load_and_split_dataset(backtest_config=config)
|
|
|
|
logger.info(f"Generating forecasts for {dataset_name} ({len(test_data.input)} time series)")
|
|
forecasts = generate_forecasts(
|
|
test_data.input,
|
|
pipeline=pipeline,
|
|
prediction_length=prediction_length,
|
|
batch_size=batch_size,
|
|
**predict_kwargs,
|
|
)
|
|
|
|
logger.info(f"Evaluating forecasts for {dataset_name}")
|
|
metrics = (
|
|
evaluate_forecasts(
|
|
forecasts,
|
|
test_data=test_data,
|
|
metrics=[
|
|
MASE(),
|
|
MeanWeightedSumQuantileLoss(QUANTILES),
|
|
],
|
|
batch_size=5000,
|
|
)
|
|
.reset_index(drop=True)
|
|
.to_dict(orient="records")
|
|
)
|
|
result_rows.append({"dataset": dataset_name, "model": model_id, **metrics[0]})
|
|
|
|
# Save results to a CSV file
|
|
results_df = (
|
|
pd.DataFrame(result_rows)
|
|
.rename(
|
|
{"MASE[0.5]": "MASE", "mean_weighted_sum_quantile_loss": "WQL"},
|
|
axis="columns",
|
|
)
|
|
.sort_values(by="dataset")
|
|
)
|
|
results_df.to_csv(metrics_path, index=False)
|
|
|
|
|
|
@app.command()
|
|
def chronos(
|
|
config_path: Path,
|
|
metrics_path: Path,
|
|
model_id: str = "amazon/chronos-t5-small",
|
|
device: str = "cuda",
|
|
torch_dtype: str = "bfloat16",
|
|
batch_size: int = 32,
|
|
num_samples: int = 20,
|
|
temperature: Optional[float] = None,
|
|
top_k: Optional[int] = None,
|
|
top_p: Optional[float] = None,
|
|
):
|
|
"""Evaluate Chronos models.
|
|
|
|
Parameters
|
|
----------
|
|
config_path : Path
|
|
Path to the evaluation config. See ./configs/.
|
|
metrics_path : Path
|
|
Path to the CSV file where metrics will be saved.
|
|
model_id : str, optional, default = "amazon/chronos-t5-small"
|
|
HuggingFace ID of the Chronos model or local path
|
|
Available models IDs:
|
|
- amazon/chronos-t5-tiny
|
|
- amazon/chronos-t5-mini
|
|
- amazon/chronos-t5-small
|
|
- amazon/chronos-t5-base
|
|
- amazon/chronos-t5-large
|
|
device : str, optional, default = "cuda"
|
|
Device on which inference will be performed
|
|
torch_dtype : str, optional
|
|
Model's dtype, by default "bfloat16"
|
|
batch_size : int, optional, default = 32
|
|
Batch size for inference. For Chronos-Bolt models, significantly larger
|
|
batch sizes can be used
|
|
num_samples : int, optional, default = 20
|
|
Number of samples to draw when using the original Chronos models
|
|
temperature : Optional[float], optional, default = 1.0
|
|
Softmax temperature to used for the original Chronos models
|
|
top_k : Optional[int], optional, default = 50
|
|
Top-K sampling, by default None
|
|
top_p : Optional[float], optional, default = 1.0
|
|
Top-p sampling, by default None
|
|
"""
|
|
if isinstance(torch_dtype, str):
|
|
torch_dtype = getattr(torch, torch_dtype)
|
|
assert isinstance(torch_dtype, torch.dtype)
|
|
|
|
# Load Chronos
|
|
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
|
|
|
|
assert isinstance(pipeline, ChronosPipeline)
|
|
|
|
eval_pipeline_and_save_results(
|
|
pipeline=pipeline,
|
|
config_path=config_path,
|
|
metrics_path=metrics_path,
|
|
model_id=model_id,
|
|
batch_size=batch_size,
|
|
num_samples=num_samples,
|
|
temperature=temperature,
|
|
top_k=top_k,
|
|
top_p=top_p,
|
|
)
|
|
|
|
|
|
@app.command()
|
|
def chronos_bolt(
|
|
config_path: Path,
|
|
metrics_path: Path,
|
|
model_id: str = "amazon/chronos-bolt-base",
|
|
device: str = "cuda",
|
|
torch_dtype: str = "float32",
|
|
batch_size: int = 32,
|
|
):
|
|
"""Evaluate Chronos-Bolt models.
|
|
|
|
Parameters
|
|
----------
|
|
config_path : Path
|
|
Path to the evaluation config. See ./configs/.
|
|
metrics_path : Path
|
|
Path to the CSV file where metrics will be saved.
|
|
model_id : str, optional, default = "amazon/chronos-bolt-base"
|
|
HuggingFace ID of the Chronos model or local path
|
|
Available model IDs:
|
|
- amazon/chronos-bolt-tiny
|
|
- amazon/chronos-bolt-mini
|
|
- amazon/chronos-bolt-small
|
|
- amazon/chronos-bolt-base
|
|
device : str, optional, default = "cuda"
|
|
Device on which inference will be performed
|
|
torch_dtype : str, optional
|
|
Model's dtype, by default "bfloat16"
|
|
batch_size : int, optional, default = 32
|
|
Batch size for inference. For Chronos-Bolt models, significantly larger
|
|
batch sizes can be used
|
|
"""
|
|
if isinstance(torch_dtype, str):
|
|
torch_dtype = getattr(torch, torch_dtype)
|
|
assert isinstance(torch_dtype, torch.dtype)
|
|
|
|
# Load Chronos
|
|
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
|
|
|
|
assert isinstance(pipeline, ChronosBoltPipeline)
|
|
|
|
eval_pipeline_and_save_results(
|
|
pipeline=pipeline,
|
|
config_path=config_path,
|
|
metrics_path=metrics_path,
|
|
model_id=model_id,
|
|
batch_size=batch_size,
|
|
)
|
|
|
|
|
|
@app.command()
|
|
def chronos_2(
|
|
config_path: Path,
|
|
metrics_path: Path,
|
|
model_id: str = "amazon/chronos-2",
|
|
device: str = "cuda",
|
|
torch_dtype: str = "float32",
|
|
batch_size: int = 32,
|
|
cross_learning: bool = False,
|
|
):
|
|
"""Evaluate Chronos-2 models.
|
|
|
|
Parameters
|
|
----------
|
|
config_path : Path
|
|
Path to the evaluation config. See ./configs/.
|
|
metrics_path : Path
|
|
Path to the CSV file where metrics will be saved.
|
|
model_id : str, optional, default = "amazon/chronos-2" FIXME
|
|
HuggingFace ID of the Chronos model or local path
|
|
Available model IDs:
|
|
- amazon/chronos-2 FIXME
|
|
device : str, optional, default = "cuda"
|
|
Device on which inference will be performed
|
|
torch_dtype : str, optional
|
|
Model's dtype, by default "bfloat16"
|
|
batch_size : int, optional, default = 32
|
|
Batch size for inference. For Chronos-Bolt models, significantly larger
|
|
batch sizes can be used
|
|
cross_learning: bool, optional, default = False
|
|
If True, cross-learning is enables and model makes joint predictions for all
|
|
items in the batch
|
|
"""
|
|
if isinstance(torch_dtype, str):
|
|
torch_dtype = getattr(torch, torch_dtype)
|
|
assert isinstance(torch_dtype, torch.dtype)
|
|
|
|
# Load Chronos
|
|
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
|
|
|
|
assert isinstance(pipeline, Chronos2Pipeline)
|
|
|
|
eval_pipeline_and_save_results(
|
|
pipeline=pipeline,
|
|
config_path=config_path,
|
|
metrics_path=metrics_path,
|
|
model_id=model_id,
|
|
batch_size=batch_size,
|
|
cross_learning=cross_learning,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger("Chronos Evaluation")
|
|
logger.setLevel(logging.INFO)
|
|
app()
|