chronos-forecasting/scripts/evaluation/evaluate.py
Abdul Fatir fdda16fc02
Bump version from 2.2.0rc4 to 2.2.0 (#426)
*Issue #, if available:*

*Description of changes:*


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
2025-12-08 13:49:34 +01:00

346 lines
11 KiB
Python

import logging
from pathlib import Path
from typing import Iterable, Optional
import datasets
import numpy as np
import pandas as pd
import torch
import typer
import yaml
from gluonts.dataset.split import split
from gluonts.ev.metrics import MASE, MeanWeightedSumQuantileLoss
from gluonts.itertools import batcher
from gluonts.model.evaluation import evaluate_forecasts
from gluonts.model.forecast import QuantileForecast, SampleForecast
from tqdm.auto import tqdm
from chronos import BaseChronosPipeline, Chronos2Pipeline, ChronosBoltPipeline, ChronosPipeline, ForecastType
app = typer.Typer(pretty_exceptions_enable=False)
QUANTILES = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
def to_gluonts_univariate(hf_dataset: datasets.Dataset):
series_fields = [col for col in hf_dataset.features if isinstance(hf_dataset.features[col], datasets.Sequence)]
series_fields.remove("timestamp")
dataset_length = hf_dataset.info.splits["train"].num_examples * len(series_fields)
# Assumes that all time series in the dataset have the same frequency
dataset_freq = pd.DatetimeIndex(hf_dataset[0]["timestamp"]).to_period()[0].freqstr
gts_dataset = []
for hf_entry in hf_dataset:
for field in series_fields:
gts_dataset.append(
{
"start": pd.Period(
hf_entry["timestamp"][0],
freq=dataset_freq,
),
"target": hf_entry[field],
}
)
assert len(gts_dataset) == dataset_length
return gts_dataset
def load_and_split_dataset(backtest_config: dict):
hf_repo = backtest_config["hf_repo"]
dataset_name = backtest_config["name"]
offset = backtest_config["offset"]
prediction_length = backtest_config["prediction_length"]
num_rolls = backtest_config["num_rolls"]
# This is needed because the datasets in autogluon/chronos_datasets_extra cannot
# be distribued due to license restrictions and must be generated on the fly
trust_remote_code = True if hf_repo == "autogluon/chronos_datasets_extra" else False
ds = datasets.load_dataset(hf_repo, dataset_name, split="train", trust_remote_code=trust_remote_code)
ds.set_format("numpy")
gts_dataset = to_gluonts_univariate(ds)
# Split dataset for evaluation
_, test_template = split(gts_dataset, offset=offset)
test_data = test_template.generate_instances(prediction_length, windows=num_rolls)
return test_data
def generate_forecasts(
test_data_input: Iterable,
pipeline: BaseChronosPipeline,
prediction_length: int,
batch_size: int,
**predict_kwargs,
):
# Generate forecasts
forecast_outputs = []
for batch in tqdm(batcher(test_data_input, batch_size=batch_size)):
context = [torch.tensor(entry["target"]) for entry in batch]
quantiles, _ = pipeline.predict_quantiles(
context,
prediction_length=prediction_length,
quantile_levels=QUANTILES,
**predict_kwargs,
)
if isinstance(quantiles, list):
# This is needed for Chronos-2 support which returns a list of tensors
quantiles = np.stack(quantiles).squeeze(axis=1)
quantiles = quantiles.swapaxes(-1, -2)
forecast_outputs.append(quantiles)
forecast_outputs = np.concatenate(forecast_outputs)
# Convert forecast samples into gluonts Forecast objects
forecasts = []
for item, ts in zip(forecast_outputs, test_data_input):
forecast_start_date = ts["start"] + len(ts["target"])
if pipeline.forecast_type == ForecastType.SAMPLES:
forecasts.append(SampleForecast(samples=item, start_date=forecast_start_date))
elif pipeline.forecast_type == ForecastType.QUANTILES:
forecasts.append(
QuantileForecast(
forecast_arrays=item,
forecast_keys=list(map(str, QUANTILES)),
start_date=forecast_start_date,
)
)
return forecasts
def eval_pipeline_and_save_results(
pipeline: BaseChronosPipeline,
config_path: Path,
metrics_path: Path,
model_id: str,
batch_size: int,
**predict_kwargs,
):
# Load backtest configs
with open(config_path) as fp:
backtest_configs = yaml.safe_load(fp)
result_rows = []
for config in backtest_configs:
dataset_name = config["name"]
prediction_length = config["prediction_length"]
logger.info(f"Loading {dataset_name}")
test_data = load_and_split_dataset(backtest_config=config)
logger.info(f"Generating forecasts for {dataset_name} ({len(test_data.input)} time series)")
forecasts = generate_forecasts(
test_data.input,
pipeline=pipeline,
prediction_length=prediction_length,
batch_size=batch_size,
**predict_kwargs,
)
logger.info(f"Evaluating forecasts for {dataset_name}")
metrics = (
evaluate_forecasts(
forecasts,
test_data=test_data,
metrics=[
MASE(),
MeanWeightedSumQuantileLoss(QUANTILES),
],
batch_size=5000,
)
.reset_index(drop=True)
.to_dict(orient="records")
)
result_rows.append({"dataset": dataset_name, "model": model_id, **metrics[0]})
# Save results to a CSV file
results_df = (
pd.DataFrame(result_rows)
.rename(
{"MASE[0.5]": "MASE", "mean_weighted_sum_quantile_loss": "WQL"},
axis="columns",
)
.sort_values(by="dataset")
)
results_df.to_csv(metrics_path, index=False)
@app.command()
def chronos(
config_path: Path,
metrics_path: Path,
model_id: str = "amazon/chronos-t5-small",
device: str = "cuda",
torch_dtype: str = "bfloat16",
batch_size: int = 32,
num_samples: int = 20,
temperature: Optional[float] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
):
"""Evaluate Chronos models.
Parameters
----------
config_path : Path
Path to the evaluation config. See ./configs/.
metrics_path : Path
Path to the CSV file where metrics will be saved.
model_id : str, optional, default = "amazon/chronos-t5-small"
HuggingFace ID of the Chronos model or local path
Available models IDs:
- amazon/chronos-t5-tiny
- amazon/chronos-t5-mini
- amazon/chronos-t5-small
- amazon/chronos-t5-base
- amazon/chronos-t5-large
device : str, optional, default = "cuda"
Device on which inference will be performed
torch_dtype : str, optional
Model's dtype, by default "bfloat16"
batch_size : int, optional, default = 32
Batch size for inference. For Chronos-Bolt models, significantly larger
batch sizes can be used
num_samples : int, optional, default = 20
Number of samples to draw when using the original Chronos models
temperature : Optional[float], optional, default = 1.0
Softmax temperature to used for the original Chronos models
top_k : Optional[int], optional, default = 50
Top-K sampling, by default None
top_p : Optional[float], optional, default = 1.0
Top-p sampling, by default None
"""
if isinstance(torch_dtype, str):
torch_dtype = getattr(torch, torch_dtype)
assert isinstance(torch_dtype, torch.dtype)
# Load Chronos
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
assert isinstance(pipeline, ChronosPipeline)
eval_pipeline_and_save_results(
pipeline=pipeline,
config_path=config_path,
metrics_path=metrics_path,
model_id=model_id,
batch_size=batch_size,
num_samples=num_samples,
temperature=temperature,
top_k=top_k,
top_p=top_p,
)
@app.command()
def chronos_bolt(
config_path: Path,
metrics_path: Path,
model_id: str = "amazon/chronos-bolt-base",
device: str = "cuda",
torch_dtype: str = "float32",
batch_size: int = 32,
):
"""Evaluate Chronos-Bolt models.
Parameters
----------
config_path : Path
Path to the evaluation config. See ./configs/.
metrics_path : Path
Path to the CSV file where metrics will be saved.
model_id : str, optional, default = "amazon/chronos-bolt-base"
HuggingFace ID of the Chronos model or local path
Available model IDs:
- amazon/chronos-bolt-tiny
- amazon/chronos-bolt-mini
- amazon/chronos-bolt-small
- amazon/chronos-bolt-base
device : str, optional, default = "cuda"
Device on which inference will be performed
torch_dtype : str, optional
Model's dtype, by default "bfloat16"
batch_size : int, optional, default = 32
Batch size for inference. For Chronos-Bolt models, significantly larger
batch sizes can be used
"""
if isinstance(torch_dtype, str):
torch_dtype = getattr(torch, torch_dtype)
assert isinstance(torch_dtype, torch.dtype)
# Load Chronos
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
assert isinstance(pipeline, ChronosBoltPipeline)
eval_pipeline_and_save_results(
pipeline=pipeline,
config_path=config_path,
metrics_path=metrics_path,
model_id=model_id,
batch_size=batch_size,
)
@app.command()
def chronos_2(
config_path: Path,
metrics_path: Path,
model_id: str = "amazon/chronos-2",
device: str = "cuda",
torch_dtype: str = "float32",
batch_size: int = 32,
cross_learning: bool = False,
):
"""Evaluate Chronos-2 models.
Parameters
----------
config_path : Path
Path to the evaluation config. See ./configs/.
metrics_path : Path
Path to the CSV file where metrics will be saved.
model_id : str, optional, default = "amazon/chronos-2" FIXME
HuggingFace ID of the Chronos model or local path
Available model IDs:
- amazon/chronos-2 FIXME
device : str, optional, default = "cuda"
Device on which inference will be performed
torch_dtype : str, optional
Model's dtype, by default "bfloat16"
batch_size : int, optional, default = 32
Batch size for inference. For Chronos-Bolt models, significantly larger
batch sizes can be used
cross_learning: bool, optional, default = False
If True, cross-learning is enables and model makes joint predictions for all
items in the batch
"""
if isinstance(torch_dtype, str):
torch_dtype = getattr(torch, torch_dtype)
assert isinstance(torch_dtype, torch.dtype)
# Load Chronos
pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype)
assert isinstance(pipeline, Chronos2Pipeline)
eval_pipeline_and_save_results(
pipeline=pipeline,
config_path=config_path,
metrics_path=metrics_path,
model_id=model_id,
batch_size=batch_size,
cross_learning=cross_learning,
)
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("Chronos Evaluation")
logger.setLevel(logging.INFO)
app()