import logging from pathlib import Path from typing import Iterable, Optional import datasets import numpy as np import pandas as pd import torch import typer import yaml from gluonts.dataset.split import split from gluonts.ev.metrics import MASE, MeanWeightedSumQuantileLoss from gluonts.itertools import batcher from gluonts.model.evaluation import evaluate_forecasts from gluonts.model.forecast import QuantileForecast, SampleForecast from tqdm.auto import tqdm from chronos import BaseChronosPipeline, Chronos2Pipeline, ChronosBoltPipeline, ChronosPipeline, ForecastType app = typer.Typer(pretty_exceptions_enable=False) QUANTILES = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] def to_gluonts_univariate(hf_dataset: datasets.Dataset): series_fields = [col for col in hf_dataset.features if isinstance(hf_dataset.features[col], datasets.Sequence)] series_fields.remove("timestamp") dataset_length = hf_dataset.info.splits["train"].num_examples * len(series_fields) # Assumes that all time series in the dataset have the same frequency dataset_freq = pd.DatetimeIndex(hf_dataset[0]["timestamp"]).to_period()[0].freqstr gts_dataset = [] for hf_entry in hf_dataset: for field in series_fields: gts_dataset.append( { "start": pd.Period( hf_entry["timestamp"][0], freq=dataset_freq, ), "target": hf_entry[field], } ) assert len(gts_dataset) == dataset_length return gts_dataset def load_and_split_dataset(backtest_config: dict): hf_repo = backtest_config["hf_repo"] dataset_name = backtest_config["name"] offset = backtest_config["offset"] prediction_length = backtest_config["prediction_length"] num_rolls = backtest_config["num_rolls"] # This is needed because the datasets in autogluon/chronos_datasets_extra cannot # be distribued due to license restrictions and must be generated on the fly trust_remote_code = True if hf_repo == "autogluon/chronos_datasets_extra" else False ds = datasets.load_dataset(hf_repo, dataset_name, split="train", trust_remote_code=trust_remote_code) ds.set_format("numpy") gts_dataset = to_gluonts_univariate(ds) # Split dataset for evaluation _, test_template = split(gts_dataset, offset=offset) test_data = test_template.generate_instances(prediction_length, windows=num_rolls) return test_data def generate_forecasts( test_data_input: Iterable, pipeline: BaseChronosPipeline, prediction_length: int, batch_size: int, **predict_kwargs, ): # Generate forecasts forecast_outputs = [] for batch in tqdm(batcher(test_data_input, batch_size=batch_size)): context = [torch.tensor(entry["target"]) for entry in batch] quantiles, _ = pipeline.predict_quantiles( context, prediction_length=prediction_length, quantile_levels=QUANTILES, **predict_kwargs, ) if isinstance(quantiles, list): # This is needed for Chronos-2 support which returns a list of tensors quantiles = np.stack(quantiles).squeeze(axis=1) quantiles = quantiles.swapaxes(-1, -2) forecast_outputs.append(quantiles) forecast_outputs = np.concatenate(forecast_outputs) # Convert forecast samples into gluonts Forecast objects forecasts = [] for item, ts in zip(forecast_outputs, test_data_input): forecast_start_date = ts["start"] + len(ts["target"]) if pipeline.forecast_type == ForecastType.SAMPLES: forecasts.append(SampleForecast(samples=item, start_date=forecast_start_date)) elif pipeline.forecast_type == ForecastType.QUANTILES: forecasts.append( QuantileForecast( forecast_arrays=item, forecast_keys=list(map(str, QUANTILES)), start_date=forecast_start_date, ) ) return forecasts def eval_pipeline_and_save_results( pipeline: BaseChronosPipeline, config_path: Path, metrics_path: Path, model_id: str, batch_size: int, **predict_kwargs, ): # Load backtest configs with open(config_path) as fp: backtest_configs = yaml.safe_load(fp) result_rows = [] for config in backtest_configs: dataset_name = config["name"] prediction_length = config["prediction_length"] logger.info(f"Loading {dataset_name}") test_data = load_and_split_dataset(backtest_config=config) logger.info(f"Generating forecasts for {dataset_name} ({len(test_data.input)} time series)") forecasts = generate_forecasts( test_data.input, pipeline=pipeline, prediction_length=prediction_length, batch_size=batch_size, **predict_kwargs, ) logger.info(f"Evaluating forecasts for {dataset_name}") metrics = ( evaluate_forecasts( forecasts, test_data=test_data, metrics=[ MASE(), MeanWeightedSumQuantileLoss(QUANTILES), ], batch_size=5000, ) .reset_index(drop=True) .to_dict(orient="records") ) result_rows.append({"dataset": dataset_name, "model": model_id, **metrics[0]}) # Save results to a CSV file results_df = ( pd.DataFrame(result_rows) .rename( {"MASE[0.5]": "MASE", "mean_weighted_sum_quantile_loss": "WQL"}, axis="columns", ) .sort_values(by="dataset") ) results_df.to_csv(metrics_path, index=False) @app.command() def chronos( config_path: Path, metrics_path: Path, model_id: str = "amazon/chronos-t5-small", device: str = "cuda", torch_dtype: str = "bfloat16", batch_size: int = 32, num_samples: int = 20, temperature: Optional[float] = None, top_k: Optional[int] = None, top_p: Optional[float] = None, ): """Evaluate Chronos models. Parameters ---------- config_path : Path Path to the evaluation config. See ./configs/. metrics_path : Path Path to the CSV file where metrics will be saved. model_id : str, optional, default = "amazon/chronos-t5-small" HuggingFace ID of the Chronos model or local path Available models IDs: - amazon/chronos-t5-tiny - amazon/chronos-t5-mini - amazon/chronos-t5-small - amazon/chronos-t5-base - amazon/chronos-t5-large device : str, optional, default = "cuda" Device on which inference will be performed torch_dtype : str, optional Model's dtype, by default "bfloat16" batch_size : int, optional, default = 32 Batch size for inference. For Chronos-Bolt models, significantly larger batch sizes can be used num_samples : int, optional, default = 20 Number of samples to draw when using the original Chronos models temperature : Optional[float], optional, default = 1.0 Softmax temperature to used for the original Chronos models top_k : Optional[int], optional, default = 50 Top-K sampling, by default None top_p : Optional[float], optional, default = 1.0 Top-p sampling, by default None """ if isinstance(torch_dtype, str): torch_dtype = getattr(torch, torch_dtype) assert isinstance(torch_dtype, torch.dtype) # Load Chronos pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) assert isinstance(pipeline, ChronosPipeline) eval_pipeline_and_save_results( pipeline=pipeline, config_path=config_path, metrics_path=metrics_path, model_id=model_id, batch_size=batch_size, num_samples=num_samples, temperature=temperature, top_k=top_k, top_p=top_p, ) @app.command() def chronos_bolt( config_path: Path, metrics_path: Path, model_id: str = "amazon/chronos-bolt-base", device: str = "cuda", torch_dtype: str = "float32", batch_size: int = 32, ): """Evaluate Chronos-Bolt models. Parameters ---------- config_path : Path Path to the evaluation config. See ./configs/. metrics_path : Path Path to the CSV file where metrics will be saved. model_id : str, optional, default = "amazon/chronos-bolt-base" HuggingFace ID of the Chronos model or local path Available model IDs: - amazon/chronos-bolt-tiny - amazon/chronos-bolt-mini - amazon/chronos-bolt-small - amazon/chronos-bolt-base device : str, optional, default = "cuda" Device on which inference will be performed torch_dtype : str, optional Model's dtype, by default "bfloat16" batch_size : int, optional, default = 32 Batch size for inference. For Chronos-Bolt models, significantly larger batch sizes can be used """ if isinstance(torch_dtype, str): torch_dtype = getattr(torch, torch_dtype) assert isinstance(torch_dtype, torch.dtype) # Load Chronos pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) assert isinstance(pipeline, ChronosBoltPipeline) eval_pipeline_and_save_results( pipeline=pipeline, config_path=config_path, metrics_path=metrics_path, model_id=model_id, batch_size=batch_size, ) @app.command() def chronos_2( config_path: Path, metrics_path: Path, model_id: str = "amazon/chronos-2", device: str = "cuda", torch_dtype: str = "float32", batch_size: int = 32, cross_learning: bool = False, ): """Evaluate Chronos-2 models. Parameters ---------- config_path : Path Path to the evaluation config. See ./configs/. metrics_path : Path Path to the CSV file where metrics will be saved. model_id : str, optional, default = "amazon/chronos-2" FIXME HuggingFace ID of the Chronos model or local path Available model IDs: - amazon/chronos-2 FIXME device : str, optional, default = "cuda" Device on which inference will be performed torch_dtype : str, optional Model's dtype, by default "bfloat16" batch_size : int, optional, default = 32 Batch size for inference. For Chronos-Bolt models, significantly larger batch sizes can be used cross_learning: bool, optional, default = False If True, cross-learning is enables and model makes joint predictions for all items in the batch """ if isinstance(torch_dtype, str): torch_dtype = getattr(torch, torch_dtype) assert isinstance(torch_dtype, torch.dtype) # Load Chronos pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) assert isinstance(pipeline, Chronos2Pipeline) eval_pipeline_and_save_results( pipeline=pipeline, config_path=config_path, metrics_path=metrics_path, model_id=model_id, batch_size=batch_size, cross_learning=cross_learning, ) if __name__ == "__main__": logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger("Chronos Evaluation") logger.setLevel(logging.INFO) app()