Merge branch 'main' into chronos2-js

This commit is contained in:
Kashif Rasul 2026-02-15 21:43:11 +01:00 committed by GitHub
commit 35bd38ffc4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 3541 additions and 6568 deletions

2
.gitattributes vendored
View file

@ -1 +1 @@
.ipynb linguist-documentation
*.ipynb linguist-language=Python

View file

@ -30,4 +30,4 @@ CUDA version:
PyTorch version:
HuggingFace transformers version:
HuggingFace accelerate version:
Pandas version:

View file

@ -36,13 +36,13 @@ jobs:
run: pip install ".[dev]" -f https://download.pytorch.org/whl/cpu/torch_stable.html
- name: Run Eval Script for Chronos-2
run: python scripts/evaluation/evaluate.py chronos-2 ci/evaluate/backtest_config.yaml $CHRONOS_2_RESULTS_CSV --model-id=s3://autogluon/chronos-2 --device=cpu --torch-dtype=float32
run: python scripts/evaluation/evaluate.py chronos-2 ci/evaluate/backtest_config.yaml $CHRONOS_2_RESULTS_CSV --model-id=amazon/chronos-2 --device=cpu --torch-dtype=float32
- name: Print Chronos-2 CSV
run: cat $CHRONOS_2_RESULTS_CSV
- name: Run Eval Script for Chronos-Bolt
run: python scripts/evaluation/evaluate.py chronos-bolt ci/evaluate/backtest_config.yaml $CHRONOS_BOLT_RESULTS_CSV --model-id=amazon/chronos-bolt-small --device=cpu --torch-dtype=float32
- name: Print Chronos-Bolt CSV
run: cat $CHRONOS_BOLT_RESULTS_CSV

7
.gitignore vendored
View file

@ -160,4 +160,9 @@ cython_debug/
#.idea/
# macOS stuff
.DS_store
.DS_store
chronos-2-finetuned
# Kiro IDE
.kiro

View file

@ -1,7 +1,3 @@
<div align="center">
<img src="https://raw.githubusercontent.com/amazon-science/chronos-forecasting/main/figures/chronos-logo.png" width="60%">
</div>
<div align="center">
# Chronos: Pretrained Models for Time Series Forecasting
@ -11,7 +7,7 @@
[![huggingface](https://img.shields.io/badge/%F0%9F%A4%97%20HF-Datasets-FFD21E)](https://huggingface.co/datasets/autogluon/chronos_datasets)
[![huggingface](https://img.shields.io/badge/%F0%9F%A4%97%20HF-Models-FFD21E)](https://huggingface.co/collections/amazon/chronos-models-65f1791d630a8d57cb718444)
[![fev](https://img.shields.io/static/v1?label=fev&message=Benchmark&color=B31B1B&logo=github)](https://github.com/autogluon/fev)
[![aws](https://img.shields.io/static/v1?label=SageMaker&message=Deploy&color=FF9900&logo=amazon-web-services)](notebooks/deploy-chronos-bolt-to-amazon-sagemaker.ipynb)
[![aws](https://img.shields.io/static/v1?label=SageMaker&message=Deploy&color=FF9900&logo=amazon-web-services)](notebooks/deploy-chronos-to-amazon-sagemaker.ipynb)
[![faq](https://img.shields.io/badge/FAQ-Questions%3F-blue)](https://github.com/amazon-science/chronos-forecasting/issues?q=is%3Aissue+label%3AFAQ)
[![License: MIT](https://img.shields.io/badge/License-Apache--2.0-green.svg)](https://opensource.org/licenses/Apache-2.0)
@ -19,8 +15,8 @@
## 🚀 News
- **20 Oct 2025**: 🚀 [Chronos-2](https://arxiv.org/abs/2510.15821) released. It offers _zero-shot_ support for univariate, multivariate, and covariate-informed forecasting tasks. Chronos-2 achieves the best performance on fev-bench, GIFT-Eval and Chronos Benchmark II amongst pretrained models. Check out [this notebook](notebooks/chronos-2-quickstart.ipynb) to get started with Chronos-2.
- **14 Feb 2025**: 🚀 Chronos-Bolt is now available on Amazon SageMaker JumpStart! Check out the [tutorial notebook](notebooks/deploy-chronos-bolt-to-amazon-sagemaker.ipynb) to learn how to deploy Chronos endpoints for production use in 3 lines of code.
- **30 Dec 2025**: ☁️ Deploy Chronos-2 to AWS with Amazon SageMaker: new guide covers real-time inference (GPU/CPU), serverless endpoints with automatic scaling, and batch transform for large-scale forecasting. See the [deployment tutorial](notebooks/deploy-chronos-to-amazon-sagemaker.ipynb).
- **20 Oct 2025**: 🚀 [Chronos-2](https://huggingface.co/amazon/chronos-2) released. It offers _zero-shot_ support for univariate, multivariate, and covariate-informed forecasting tasks. Chronos-2 achieves the best performance on fev-bench, GIFT-Eval and Chronos Benchmark II amongst pretrained models. Check out [this notebook](notebooks/chronos-2-quickstart.ipynb) to get started with Chronos-2.
- **12 Dec 2024**: 📊 We released [`fev`](https://github.com/autogluon/fev), a lightweight package for benchmarking time series forecasting models based on the [Hugging Face `datasets`](https://huggingface.co/docs/datasets/en/index) library.
- **26 Nov 2024**: ⚡️ Chronos-Bolt models released [on HuggingFace](https://huggingface.co/collections/amazon/chronos-models-65f1791d630a8d57cb718444). Chronos-Bolt models are more accurate (5% lower error), up to 250x faster and 20x more memory efficient than the original Chronos models of the same size!
- **13 Mar 2024**: 🚀 Chronos [paper](https://arxiv.org/abs/2403.07815) and inference code released.
@ -39,7 +35,9 @@ This package provides an interface to the Chronos family of **pretrained time se
| Model ID | Parameters |
| ---------------------------------------------------------------------- | ---------- |
| [`s3://autogluon/chronos-2`](https://arxiv.org/abs/2510.15821) | 120M |
| [`amazon/chronos-2`](https://huggingface.co/amazon/chronos-2) | 120M |
| [`autogluon/chronos-2-synth`](https://huggingface.co/autogluon/chronos-2-synth) | 120M |
| [`autogluon/chronos-2-small`](https://huggingface.co/autogluon/chronos-2-small) | 28M |
| [`amazon/chronos-bolt-tiny`](https://huggingface.co/amazon/chronos-bolt-tiny) | 9M |
| [`amazon/chronos-bolt-mini`](https://huggingface.co/amazon/chronos-bolt-mini) | 21M |
| [`amazon/chronos-bolt-small`](https://huggingface.co/amazon/chronos-bolt-small) | 48M |
@ -48,7 +46,7 @@ This package provides an interface to the Chronos family of **pretrained time se
| [`amazon/chronos-t5-mini`](https://huggingface.co/amazon/chronos-t5-mini) | 20M |
| [`amazon/chronos-t5-small`](https://huggingface.co/amazon/chronos-t5-small) | 46M |
| [`amazon/chronos-t5-base`](https://huggingface.co/amazon/chronos-t5-base) | 200M |
| [`amazon/chronos-t5-large`](https://huggingface.co/amazon/chronos-t5-large) | 710M |
| [`amazon/chronos-t5-large`](https://huggingface.co/amazon/chronos-t5-large) | 710M |
</div>
@ -60,6 +58,10 @@ To perform inference with Chronos, the easiest way is to install this package th
pip install chronos-forecasting
```
> [!TIP]
> For reliable production use, we recommend using Chronos-2 models through [Amazon SageMaker JumpStart](https://aws.amazon.com/sagemaker/ai/jumpstart/). Check out [this tutorial](notebooks/deploy-chronos-to-amazon-sagemaker.ipynb) to learn how to deploy Chronos-2 inference endpoints to AWS with just a few lines of code.
### Forecasting
A minimal example showing how to perform forecasting using Chronos-2:
@ -68,7 +70,7 @@ A minimal example showing how to perform forecasting using Chronos-2:
import pandas as pd # requires: pip install 'pandas[pyarrow]'
from chronos import Chronos2Pipeline
pipeline = Chronos2Pipeline.from_pretrained("s3://autogluon/chronos-2", device_map="cuda")
pipeline = Chronos2Pipeline.from_pretrained("amazon/chronos-2", device_map="cuda")
# Load historical target values and past values of covariates
context_df = pd.read_parquet("https://autogluon.s3.amazonaws.com/datasets/timeseries/electricity_price/train.parquet")
@ -115,8 +117,15 @@ plt.legend()
## Example Notebooks
- [Chronos-2 Quick Start](notebooks/chronos-2-quickstart.ipynb)
- [Deploy Chronos-Bolt on Amazon SageMaker](notebooks/deploy-chronos-bolt-to-amazon-sagemaker.ipynb)
- Deploy Chronos-2 on Amazon SageMaker (coming soon!)
&nbsp;
<a href="https://studiolab.sagemaker.aws/import/github/amazon-science/chronos-forecasting/blob/main/notebooks/chronos-2-quickstart.ipynb">
<img src="https://studiolab.sagemaker.aws/studiolab.svg" alt="Open In SageMaker Studio Lab" height="18" align="absmiddle">
</a>
&nbsp;
<a href="https://colab.research.google.com/github/amazon-science/chronos-forecasting/blob/main/notebooks/chronos-2-quickstart.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" height="18" align="absmiddle">
</a>
- [Deploy Chronos-2 on Amazon SageMaker](notebooks/deploy-chronos-to-amazon-sagemaker.ipynb)
## 📝 Citation

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 227 KiB

File diff suppressed because it is too large Load diff

Before

Width:  |  Height:  |  Size: 149 KiB

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -14,13 +14,12 @@ readme = "README.md"
license = { file = "LICENSE" }
requires-python = ">=3.10"
dependencies = [
"torch>=2.0,<3",
"transformers>=4.49,<5",
"torch>=2.2,<3",
"transformers>=4.41,<5",
"accelerate>=0.34,<2",
"numpy>=1.21,<3",
"einops>=0.7.0,<1",
"scikit-learn>=1.6.0,<2",
"boto3",
]
classifiers = [
"Programming Language :: Python :: 3",
@ -40,7 +39,19 @@ packages = ["src/chronos"]
path = "src/chronos/__about__.py"
[project.optional-dependencies]
test = ["pytest~=8.0", "numpy>=1.21,<3", "fev>=0.6.1", "pandas>=2.0,<2.4"]
extras = [
"boto3>=1.10,<2",
"peft>=0.13.0,<0.18",
"fev>=0.6.1",
"pandas[pyarrow]>=2.0,<2.4",
]
test = [
"pytest~=8.0",
"boto3>=1.10,<2",
"peft>=0.13.0,<1",
"fev>=0.6.1",
"pandas[pyarrow]>=2.0,<2.4",
]
typecheck = ["mypy~=1.9"]
dev = [
"gluonts[pro]~=0.16",

View file

@ -295,7 +295,7 @@ def chronos_2(
device: str = "cuda",
torch_dtype: str = "float32",
batch_size: int = 32,
predict_batches_jointly: bool = False,
cross_learning: bool = False,
):
"""Evaluate Chronos-2 models.
@ -316,7 +316,7 @@ def chronos_2(
batch_size : int, optional, default = 32
Batch size for inference. For Chronos-Bolt models, significantly larger
batch sizes can be used
predict_batches_jointly: bool, optional, default = False
cross_learning: bool, optional, default = False
If True, cross-learning is enables and model makes joint predictions for all
items in the batch
"""
@ -335,7 +335,7 @@ def chronos_2(
metrics_path=metrics_path,
model_id=model_id,
batch_size=batch_size,
predict_batches_jointly=predict_batches_jointly,
cross_learning=cross_learning,
)

View file

@ -1 +1 @@
__version__ = "2.0.0"
__version__ = "2.2.2"

View file

@ -17,8 +17,10 @@ import torch
if TYPE_CHECKING:
import datasets
import fev
import pandas as pd
from transformers import PreTrainedModel
from .utils import left_pad_and_stack_1D
@ -53,6 +55,14 @@ class BaseChronosPipeline(metaclass=PipelineRegistry):
# for easy access to the inner HF-style model
self.inner_model = inner_model
@property
def model_context_length(self) -> int:
raise NotImplementedError()
@property
def model_prediction_length(self) -> int:
raise NotImplementedError()
def _prepare_and_validate_context(self, context: Union[torch.Tensor, List[torch.Tensor]]):
if isinstance(context, list):
context = left_pad_and_stack_1D(context)
@ -122,6 +132,123 @@ class BaseChronosPipeline(metaclass=PipelineRegistry):
"""
raise NotImplementedError()
def predict_df(
self,
df: "pd.DataFrame",
*,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
target: str = "target",
prediction_length: int | None = None,
quantile_levels: list[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
validate_inputs: bool = True,
freq: str | None = None,
**predict_kwargs,
) -> "pd.DataFrame":
"""
Perform forecasting on time series data in a long-format pandas DataFrame.
Parameters
----------
df
Time series data in long format with an id column, a timestamp, and one target column.
Any other columns, if present, will be ignored
id_column
The name of the column which contains the unique time series identifiers, by default "item_id"
timestamp_column
The name of the column which contains timestamps, by default "timestamp"
All time series in the dataframe must have regular timestamps with the same frequency (no gaps)
target
The name of the column which contains the target variables to be forecasted, by default "target"
prediction_length
Number of steps to predict for each time series
quantile_levels
Quantile levels to compute
validate_inputs
[ADVANCED] When True (default), validates dataframes before prediction. Setting to False removes the
validation overhead, but may silently lead to wrong predictions if data is misformatted. When False, you
must ensure: (1) all dataframes are sorted by (id_column, timestamp_column); (2) future_df (if provided)
has the same item IDs as df with exactly prediction_length rows of future timestamps per item; (3) all
timestamps are regularly spaced (e.g., with hourly frequency).
freq
Frequency string for timestamp generation (e.g., "h", "D", "W"). Can only be used when
validate_inputs=False. When provided, skips frequency inference from the data.
**predict_kwargs
Additional arguments passed to predict_quantiles
Returns
-------
The forecasts dataframe generated by the model with the following columns
- `id_column`: The time series ID
- `timestamp_column`: Future timestamps
- "target_name": The name of the target column
- "predictions": The point predictions generated by the model
- One column for predictions at each quantile level in `quantile_levels`
"""
try:
import pandas as pd
from .df_utils import convert_df_input_to_list_of_dicts_input
except ImportError:
raise ImportError("pandas is required for predict_df. Please install it with `pip install pandas`.")
if not isinstance(target, str):
raise ValueError(
f"Expected `target` to be str, but found {type(target)}. {self.__class__.__name__} only supports univariate forecasting."
)
if prediction_length is None:
prediction_length = self.model_prediction_length
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
id_column=id_column,
timestamp_column=timestamp_column,
target_columns=[target],
prediction_length=prediction_length,
freq=freq,
validate_inputs=validate_inputs,
)
# NOTE: any covariates, if present, are ignored here
context = [torch.tensor(item["target"]).squeeze(0) for item in inputs] # squeeze the extra variate dim
# Generate forecasts
quantiles, mean = self.predict_quantiles(
inputs=context,
prediction_length=prediction_length,
quantile_levels=quantile_levels,
limit_prediction_length=False,
**predict_kwargs,
)
quantiles_np = quantiles.numpy() # [n_series, horizon, num_quantiles]
mean_np = mean.numpy() # [n_series, horizon]
series_ids = list(prediction_timestamps.keys())
future_ts = list(prediction_timestamps.values())
data = {
id_column: np.repeat(series_ids, prediction_length),
timestamp_column: np.concatenate(future_ts),
"target_name": target,
"predictions": mean_np.ravel(),
}
quantiles_flat = quantiles_np.reshape(-1, len(quantile_levels))
for q_idx, q_level in enumerate(quantile_levels):
data[str(q_level)] = quantiles_flat[:, q_idx]
predictions_df = pd.DataFrame(data)
# If validate_inputs=False, the df is used as-is without sorting by item_id, no reordering required
if validate_inputs:
predictions_df.set_index(id_column, inplace=True)
predictions_df = predictions_df.loc[original_order]
predictions_df.reset_index(inplace=True)
return predictions_df
def predict_fev(
self, task: "fev.Task", batch_size: int = 32, **kwargs
) -> tuple[list["datasets.DatasetDict"], float]:

View file

@ -6,6 +6,7 @@
import logging
import os
import re
import warnings
from pathlib import Path
import boto3
@ -110,6 +111,12 @@ def cache_model_from_s3(
# Use CloudFront CDN for faster, cached downloads if available
if cloudfront_url:
warnings.warn(
f"Loading {s3_uri} from CloudFront is deprecated and will be removed in a future version. "
f'Please specify a HuggingFace model_id instead. For example: Chronos2Pipeline.from_pretrained("amazon/chronos-2")',
category=FutureWarning,
stacklevel=3,
)
try:
download_model_files_from_cloudfront(
cloudfront_url=cloudfront_url,

View file

@ -377,6 +377,14 @@ class ChronosPipeline(BaseChronosPipeline):
self.tokenizer = tokenizer
self.model = model
@property
def model_context_length(self) -> int:
return self.model.config.context_length
@property
def model_prediction_length(self) -> int:
return self.model.config.prediction_length
def _prepare_and_validate_context(self, context: Union[torch.Tensor, List[torch.Tensor]]):
if isinstance(context, list):
context = left_pad_and_stack_1D(context)

View file

@ -15,7 +15,6 @@ from torch.utils.data import IterableDataset
if TYPE_CHECKING:
import datasets
import fev
import pandas as pd
TensorOrArray: TypeAlias = torch.Tensor | np.ndarray
@ -105,9 +104,29 @@ def validate_and_prepare_single_dict_task(
f"Found invalid type for `past_covariates` in element at index {idx}. "
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(task_past_covariates)}'
)
# gather keys and ensure known-future keys come last to match downstream assumptions
task_covariates_keys = sorted(task_past_covariates.keys())
task_future_covariates = task.get("future_covariates", {})
if not isinstance(task_future_covariates, dict):
raise ValueError(
f"Found invalid type for `future_covariates` in element at index {idx}. "
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(task_future_covariates)}'
)
task_future_covariates_keys = sorted(task_future_covariates.keys())
if not set(task_future_covariates_keys).issubset(task_covariates_keys):
raise ValueError(
f"Expected keys in `future_covariates` to be a subset of `past_covariates` {task_covariates_keys}, "
f"but found {task_future_covariates_keys} in element at index {idx}"
)
# create ordered keys: past-only first, then known-future (so known-future are the last rows)
task_past_only_keys = [k for k in task_covariates_keys if k not in task_future_covariates_keys] # past_only_keys
task_ordered_covariate_keys = task_past_only_keys + task_future_covariates_keys
task_past_covariates_list: list[torch.Tensor] = []
for key in task_covariates_keys:
for key in task_ordered_covariate_keys:
tensor = task_past_covariates[key]
if isinstance(tensor, np.ndarray):
# apply encoding to categorical variates
@ -140,21 +159,10 @@ def validate_and_prepare_single_dict_task(
if task_past_covariates_list
else torch.zeros((0, history_length), device=task_target.device)
)
# validate future_covariates
task_future_covariates = task.get("future_covariates", {})
if not isinstance(task_future_covariates, dict):
raise ValueError(
f"Found invalid type for `future_covariates` in element at index {idx}. "
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(task_future_covariates)}'
)
task_future_covariates_keys = sorted(task_future_covariates.keys())
if not set(task_future_covariates_keys).issubset(task_covariates_keys):
raise ValueError(
f"Expected keys in `future_covariates` to be a subset of `past_covariates` {task_covariates_keys}, "
f"but found {task_future_covariates_keys} in element at index {idx}"
)
# validate future_covariates (build rows in the same task_ordered_covariate_keys order)
task_future_covariates_list: list[torch.Tensor] = []
for key in task_covariates_keys:
for key in task_ordered_covariate_keys:
# future values of past-only covariates are filled with NaNs
tensor = task_future_covariates.get(key, torch.full((prediction_length,), fill_value=torch.nan))
if isinstance(tensor, np.ndarray):
@ -186,7 +194,8 @@ def validate_and_prepare_single_dict_task(
).to(dtype=torch.float32)
task_n_targets = task_target.shape[0]
task_n_covariates = task_past_covariates_tensor.shape[0]
task_n_future_covariates = len(task_future_covariates_list)
# number of known-future covariates
task_n_future_covariates = len(task_future_covariates_keys)
return (
task_context_tensor,
@ -265,308 +274,6 @@ def convert_tensor_input_to_list_of_dicts_input(tensor: TensorOrArray) -> list[d
return output
def _validate_df_types_and_cast(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
id_column: str = "item_id",
timestamp_column: str = "timestamp",
) -> tuple["pd.DataFrame", "pd.DataFrame | None"]:
import pandas as pd
astype_dict = {}
future_astype_dict = {}
for col in df.columns.drop([id_column, timestamp_column]):
col_dtype = df[col].dtype
if col in target_columns and not pd.api.types.is_numeric_dtype(df[col]):
raise ValueError(f"All target columns must be numeric but got {col=} with dtype={col_dtype}")
if (
pd.api.types.is_object_dtype(df[col])
or pd.api.types.is_string_dtype(df[col])
or isinstance(col_dtype, pd.CategoricalDtype)
):
astype_dict[col] = "category"
elif pd.api.types.is_numeric_dtype(df[col]) or pd.api.types.is_bool_dtype(df[col]):
astype_dict[col] = "float32"
else:
raise ValueError(
f"All columns must contain numeric, object, category, string, or bool dtype but got {col=} with dtype={col_dtype}"
)
if future_df is not None and col in future_df.columns:
if future_df[col].dtype != col_dtype:
raise ValueError(
f"Column {col} in future_df has dtype {future_df[col].dtype} but column in df has dtype {col_dtype}"
)
future_astype_dict[col] = astype_dict[col]
df = df.astype(astype_dict, copy=True)
if future_df is not None:
future_df = future_df.astype(future_astype_dict, copy=True)
return df, future_df
def validate_df_inputs(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
prediction_length: int,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
) -> tuple["pd.DataFrame", "pd.DataFrame | None", "pd.Timedelta", list[int], list[int] | None, np.ndarray]:
"""
Validates and prepares dataframe inputs passed to `Chronos2Pipeline.predict_df`.
Parameters
----------
df
Input dataframe containing time series data with columns:
- id_column: Identifier for each time series
- timestamp_column: Timestamps for each observation
- target_columns: One or more target variables to forecast
- Additional columns are treated as covariates
future_df
Optional dataframe containing future covariate values with columns:
- id_column: Identifier for each time series
- timestamp_column: Future timestamps
- Subset of covariate columns from df
target_columns
Names of target columns to forecast
prediction_length
Number of future time steps to predict
id_column
Name of column containing time series identifiers
timestamp_column
Name of column containing timestamps
Returns
-------
A tuple containing:
- Validated and sorted input dataframe
- Validated and sorted future dataframe (if provided)
- Inferred frequency of the time series
- List of series lengths from input dataframe
- List of series lengths from future dataframe (if provided)
- Original order of time series IDs
Raises
------
ValueError
If validation fails for:
- Missing required columns
- Invalid data types
- Inconsistent frequencies
- Insufficient data points
- Mismatched series between df and future_df
- Invalid future_df lengths
"""
import pandas as pd
required_cols = [id_column, timestamp_column] + target_columns
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"df does not contain all expected columns. Missing columns: {missing_cols}")
if future_df is not None:
future_required_cols = [id_column, timestamp_column]
missing_future_cols = [col for col in future_required_cols if col not in future_df.columns]
targets_in_future = [col for col in future_df.columns if col in target_columns]
extra_future_cols = [col for col in future_df.columns if col not in df.columns]
if missing_future_cols:
raise ValueError(
f"future_df does not contain all expected columns. Missing columns: {missing_future_cols}"
)
if targets_in_future:
raise ValueError(
f"future_df cannot contain target columns. Target columns found in future_df: {targets_in_future}"
)
if extra_future_cols:
raise ValueError(f"future_df cannot contain columns not present in df. Extra columns: {extra_future_cols}")
df, future_df = _validate_df_types_and_cast(
df, future_df, id_column=id_column, timestamp_column=timestamp_column, target_columns=target_columns
)
# Get the original order of time series IDs
original_order = df[id_column].unique()
# Sort and prepare df
df[timestamp_column] = pd.to_datetime(df[timestamp_column])
df = df.sort_values([id_column, timestamp_column])
# Get series lengths
series_lengths = df[id_column].value_counts(sort=False).to_list()
def validate_freq(timestamps: pd.Series, series_id: str):
freq = pd.infer_freq(timestamps)
if not freq:
raise ValueError(f"Could not infer frequency for series {series_id}")
return freq
# Validate each series
all_freqs = []
start_idx = 0
for length in series_lengths:
if length < 3:
series_id = df.iloc[start_idx][id_column]
raise ValueError(
f"Every time series must have at least 3 data points, found {length=} for series {series_id}"
)
series_data = df.iloc[start_idx : start_idx + length]
timestamps = series_data[timestamp_column]
series_id = series_data.iloc[0][id_column]
all_freqs.append(validate_freq(timestamps, series_id))
start_idx += length
if len(set(all_freqs)) > 1:
raise ValueError("All time series must have the same frequency")
inferred_freq = all_freqs[0]
# Sort future_df if provided and validate its series lengths
future_series_lengths = None
if future_df is not None:
future_df[timestamp_column] = pd.to_datetime(future_df[timestamp_column])
future_df = future_df.sort_values([id_column, timestamp_column])
# Validate that future_df contains all series from df
context_ids = set(df[id_column].unique())
future_ids = set(future_df[id_column].unique())
if context_ids != future_ids:
raise ValueError("future_df must contain the same time series IDs as df")
future_series_lengths = future_df[id_column].value_counts(sort=False).to_list()
# Validate future series lengths match prediction_length
future_start_idx = 0
for future_length in future_series_lengths:
future_series_data = future_df.iloc[future_start_idx : future_start_idx + future_length]
future_timestamps = future_series_data[timestamp_column]
future_series_id = future_series_data.iloc[0][id_column]
if future_length != prediction_length:
raise ValueError(
f"Future covariates all time series must have length {prediction_length}, got {future_length} for series {future_series_id}"
)
if future_length < 3 or inferred_freq != validate_freq(future_timestamps, future_series_id):
raise ValueError(
f"Future covariates must have the same frequency as context, found series {future_series_id} with a different frequency"
)
future_start_idx += future_length
assert len(series_lengths) == len(future_series_lengths)
return df, future_df, inferred_freq, series_lengths, future_series_lengths, original_order
def convert_df_input_to_list_of_dicts_input(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
prediction_length: int,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
) -> tuple[list[dict[str, np.ndarray | dict[str, np.ndarray]]], np.ndarray, dict[str, "pd.DatetimeIndex"]]:
"""
Convert from dataframe input format to a list of dictionaries input format.
Parameters
----------
df
Input dataframe containing time series data with columns:
- id_column: Identifier for each time series
- timestamp_column: Timestamps for each observation
- target_columns: One or more target variables to forecast
- Additional columns are treated as covariates
future_df
Optional dataframe containing future covariate values with columns:
- id_column: Identifier for each time series
- timestamp_column: Future timestamps
- Subset of covariate columns from df
target_columns
Names of target columns to forecast
prediction_length
Number of future time steps to predict
id_column
Name of column containing time series identifiers
timestamp_column
Name of column containing timestamps
Returns
-------
A tuple containing:
- List of dictionaries in the format expected by `Chronos2Pipeline.predict`
- Original order of time series IDs
- Dictionary mapping series IDs to future time index
"""
import pandas as pd
df, future_df, freq, series_lengths, future_series_lengths, original_order = validate_df_inputs(
df,
future_df=future_df,
id_column=id_column,
timestamp_column=timestamp_column,
target_columns=target_columns,
prediction_length=prediction_length,
)
# Convert to list of dicts format
inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = []
prediction_timestamps: dict[str, pd.DatetimeIndex] = {}
start_idx: int = 0
future_start_idx: int = 0
for i, length in enumerate(series_lengths):
series_data = df.iloc[start_idx : start_idx + length]
# Extract target(s)
target_data = series_data[target_columns].to_numpy().T # Shape: (n_targets, history_length)
task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_data}
# Generate future timestamps
series_id = series_data.iloc[0][id_column]
last_timestamp = series_data[timestamp_column].iloc[-1]
future_ts = pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:]
prediction_timestamps[series_id] = future_ts
# Handle covariates if present
covariate_cols = [
col for col in series_data.columns if col not in [id_column, timestamp_column] + target_columns
]
if covariate_cols:
past_covariates = {col: series_data[col].to_numpy() for col in covariate_cols}
task["past_covariates"] = past_covariates
# Handle future covariates
if future_df is not None:
assert future_series_lengths is not None
future_length = future_series_lengths[i]
future_data = future_df.iloc[future_start_idx : future_start_idx + future_length]
assert future_data[timestamp_column].iloc[0] == future_ts[0], (
f"the first timestamp in future_df must be the first forecast timestamp, found mismatch "
f"({future_data[timestamp_column].iloc[0]} != {future_ts[0]}) in series {series_id}"
)
if len(future_data) > 0:
future_covariates = {
col: future_data[col].to_numpy() for col in covariate_cols if col in future_data.columns
}
if future_covariates:
task["future_covariates"] = future_covariates
future_start_idx += future_length
inputs.append(task)
start_idx += length
assert len(inputs) == len(series_lengths)
return inputs, original_order, prediction_timestamps
def _cast_fev_features(
past_data: "datasets.Dataset",
future_data: "datasets.Dataset",
@ -770,6 +477,7 @@ class Chronos2Dataset(IterableDataset):
task_n_covariates,
task_n_future_covariates,
) = self.tasks[task_idx]
task_past_tensor, task_future_tensor = task_past_tensor.clone(), task_future_tensor.clone()
task_n_past_only_covariates = task_n_covariates - task_n_future_covariates
full_length = task_past_tensor.shape[-1]
@ -795,7 +503,9 @@ class Chronos2Dataset(IterableDataset):
# the task_context_tensor by slicing the appropriate indices which we do below
if self.mode in [DatasetMode.TRAIN, DatasetMode.VALIDATION]:
# the first task_n_targets elements in task_context_tensor are the targets
task_future_target = task_past_tensor[:, slice_idx : slice_idx + self.prediction_length]
task_future_target = task_past_tensor[:, slice_idx : slice_idx + self.prediction_length].clone()
# mask out all rows corresponding to covariates
task_future_target[task_n_targets:] = torch.nan
if task_n_future_covariates > 0:
# the last task_n_future_covariates elements in task_context_tensor are the known covariates

View file

@ -547,6 +547,74 @@ class Chronos2Model(PreTrainedModel):
return loss
def encode(
self,
context: torch.Tensor,
context_mask: torch.Tensor | None = None,
group_ids: torch.Tensor | None = None,
future_covariates: torch.Tensor | None = None,
future_covariates_mask: torch.Tensor | None = None,
num_output_patches: int = 1,
future_target: torch.Tensor | None = None,
future_target_mask: torch.Tensor | None = None,
output_attentions: bool = False,
):
self._validate_input(
context=context,
context_mask=context_mask,
future_covariates=future_covariates,
future_covariates_mask=future_covariates_mask,
group_ids=group_ids,
num_output_patches=num_output_patches,
future_target=future_target,
future_target_mask=future_target_mask,
)
batch_size = context.shape[0]
patched_context, attention_mask, loc_scale = self._prepare_patched_context(
context=context, context_mask=context_mask
)
num_context_patches = attention_mask.shape[-1]
# get input embeddings of shape (batch, num_context_patches, d_model)
input_embeds: torch.Tensor = self.input_patch_embedding(patched_context)
# append [REG] special token embedding, if needed
if self.chronos_config.use_reg_token:
reg_input_ids = torch.full((batch_size, 1), self.config.reg_token_id, device=input_embeds.device)
reg_embeds = self.shared(reg_input_ids)
input_embeds = torch.cat([input_embeds, reg_embeds], dim=-2)
attention_mask = torch.cat(
[attention_mask.to(self.dtype), torch.ones_like(reg_input_ids).to(self.dtype)], dim=-1
)
patched_future, patched_future_covariates_mask = self._prepare_patched_future(
future_covariates=future_covariates,
future_covariates_mask=future_covariates_mask,
loc_scale=loc_scale,
num_output_patches=num_output_patches,
batch_size=batch_size,
)
future_attention_mask = torch.ones(batch_size, num_output_patches, dtype=self.dtype, device=self.device)
# get future embeddings of shape (batch, num_output_patches, d_model)
future_embeds: torch.Tensor = self.input_patch_embedding(patched_future)
# concatenate context and future embeddings and masks
input_embeds = torch.cat([input_embeds, future_embeds], dim=-2)
attention_mask = torch.cat([attention_mask, future_attention_mask], dim=-1)
if group_ids is None:
# by default, each time series is treated independently, i.e., no mixing across the batch
group_ids = torch.arange(batch_size, dtype=torch.long, device=self.device)
encoder_outputs: Chronos2EncoderOutput = self.encoder(
attention_mask=attention_mask,
inputs_embeds=input_embeds,
group_ids=group_ids,
output_attentions=output_attentions,
)
return encoder_outputs, loc_scale, patched_future_covariates_mask, num_context_patches
def forward(
self,
context: torch.Tensor,
@ -625,63 +693,19 @@ class Chronos2Model(PreTrainedModel):
- enc_time_self_attn_weights: Time self attention weights, if output_attentions=True
- enc_group_self_attn_weights: Group self attention weights, if output_attentions=True
"""
self._validate_input(
batch_size = context.shape[0]
encoder_outputs, loc_scale, patched_future_covariates_mask, num_context_patches = self.encode(
context=context,
context_mask=context_mask,
group_ids=group_ids,
future_covariates=future_covariates,
future_covariates_mask=future_covariates_mask,
group_ids=group_ids,
num_output_patches=num_output_patches,
future_target=future_target,
future_target_mask=future_target_mask,
)
batch_size = context.shape[0]
patched_context, attention_mask, loc_scale = self._prepare_patched_context(
context=context, context_mask=context_mask
)
num_context_patches = attention_mask.shape[-1]
# get input embeddings of shape (batch, num_context_patches, d_model)
input_embeds: torch.Tensor = self.input_patch_embedding(patched_context)
# append [REG] special token embedding, if needed
if self.chronos_config.use_reg_token:
reg_input_ids = torch.full((batch_size, 1), self.config.reg_token_id, device=input_embeds.device)
reg_embeds = self.shared(reg_input_ids)
input_embeds = torch.cat([input_embeds, reg_embeds], dim=-2)
attention_mask = torch.cat(
[attention_mask.to(self.dtype), torch.ones_like(reg_input_ids).to(self.dtype)], dim=-1
)
patched_future, patched_future_covariates_mask = self._prepare_patched_future(
future_covariates=future_covariates,
future_covariates_mask=future_covariates_mask,
loc_scale=loc_scale,
num_output_patches=num_output_patches,
batch_size=batch_size,
)
future_attention_mask = torch.ones(batch_size, num_output_patches, dtype=self.dtype, device=self.device)
# get future embeddings of shape (batch, num_output_patches, d_model)
future_embeds: torch.Tensor = self.input_patch_embedding(patched_future)
# concatenate context and future embeddings and masks
input_embeds = torch.cat([input_embeds, future_embeds], dim=-2)
attention_mask = torch.cat([attention_mask, future_attention_mask], dim=-1)
if group_ids is None:
# by default, each time series is treated independently, i.e., no mixing across the batch
group_ids = torch.arange(batch_size, dtype=torch.long, device=self.device)
encoder_outputs: Chronos2EncoderOutput = self.encoder(
attention_mask=attention_mask,
inputs_embeds=input_embeds,
group_ids=group_ids,
output_attentions=output_attentions,
)
hidden_states: torch.Tensor = encoder_outputs[0]
assert hidden_states.shape == (batch_size, num_context_patches + 1 + num_output_patches, self.model_dim)
# slice the last num_output_patches hidden states to be input into the output_patch_embedding

View file

@ -9,29 +9,29 @@ import time
import warnings
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Sequence
from typing import TYPE_CHECKING, Callable, Literal, Mapping, Sequence
import numpy as np
import torch
from einops import rearrange, repeat
from torch.utils.data import DataLoader
from transformers import AutoConfig
from transformers.utils.import_utils import is_peft_available
from transformers.utils.peft_utils import find_adapter_config_file
import chronos.chronos2
from chronos.base import BaseChronosPipeline, ForecastType
from chronos.chronos2 import Chronos2Model
from chronos.chronos2.dataset import (
Chronos2Dataset,
DatasetMode,
TensorOrArray,
convert_df_input_to_list_of_dicts_input,
)
from chronos.chronos2.dataset import Chronos2Dataset, DatasetMode, TensorOrArray
from chronos.df_utils import convert_df_input_to_list_of_dicts_input
from chronos.utils import interpolate_quantiles, weighted_quantile
if TYPE_CHECKING:
import datasets
import fev
import pandas as pd
from peft import LoraConfig
from transformers.trainer_callback import TrainerCallback
logger = logging.getLogger(__name__)
@ -103,13 +103,18 @@ class Chronos2Pipeline(BaseChronosPipeline):
| Sequence[TensorOrArray]
| Sequence[Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]]
| None = None,
finetune_mode: Literal["full", "lora"] = "full",
lora_config: "LoraConfig | dict | None" = None,
context_length: int | None = None,
learning_rate: float = 1e-5,
learning_rate: float = 1e-6,
num_steps: int = 1000,
batch_size: int = 256,
output_dir: Path | str | None = None,
min_past: int | None = None,
finetuned_ckpt_name: str = "finetuned-ckpt",
callbacks: list["TrainerCallback"] | None = None,
remove_printer_callback: bool = False,
disable_data_parallel: bool = True,
**extra_trainer_kwargs,
) -> "Chronos2Pipeline":
"""
@ -127,10 +132,16 @@ class Chronos2Pipeline(BaseChronosPipeline):
validation_inputs
The time series used for validation and model selection. The format of `validation_inputs` is exactly the same as `inputs`, by default None which
means that no validation is performed. Note that enabling validation may slow down fine-tuning for large datasets.
finetune_mode
One of "full" (performs full fine-tuning) or "lora" (performs Low Rank Adaptation (LoRA) fine-tuning), by default "full"
lora_config
The configuration to use for LoRA fine-tuning when finetune_mode="lora". Can be a `LoraConfig` object or a dict which is used to initialize `LoraConfig`.
When unspecified and finetune_mode="lora", a default configuration is used
context_length
The maximum context length used during fine-tuning, by default set to the model's default context length
learning_rate
The learning rate for the optimizer, by default 1e-5
The learning rate for the optimizer, by default 1e-6
When finetune_mode="lora", we recommend using a higher value of the learning rate, such as 1e-5
num_steps
The number of steps to fine-tune for, by default 1000
batch_size
@ -144,6 +155,12 @@ class Chronos2Pipeline(BaseChronosPipeline):
are filtered out, by default set equal to prediction_length
finetuned_ckpt_name
The name of the directory inside `output_dir` in which the final fine-tuned checkpoint will be saved, by default "finetuned-ckpt"
callbacks
A list of `TrainerCallback`s which will be forwarded to the HuggingFace `Trainer`
remove_printer_callback
If True, all instances of `PrinterCallback` are removed from callbacks
disable_data_parallel
If True, ensures that DataParallel is disabled and training happens on a single GPU
**extra_trainer_kwargs
Extra kwargs are directly forwarded to `TrainingArguments`
@ -153,21 +170,59 @@ class Chronos2Pipeline(BaseChronosPipeline):
"""
import torch.cuda
from transformers.trainer_callback import PrinterCallback
from transformers.training_args import TrainingArguments
if finetune_mode == "lora":
if is_peft_available():
from peft import LoraConfig, get_peft_model
else:
warnings.warn(
"`peft` is required for `finetune_mode='lora'`. Please install it with `pip install peft`. Falling back to `finetune_mode='full'`."
)
finetune_mode = "full"
lora_config = None
from chronos.chronos2.trainer import Chronos2Trainer, EvaluateAndSaveFinalStepCallback
warnings.warn(
"Fine-tuning support is experimental and may be changed in future versions.",
category=FutureWarning,
stacklevel=2,
)
assert finetune_mode in ["full", "lora"], f"finetune_mode must be one of ['full', 'lora'], got {finetune_mode}"
if finetune_mode == "full" and lora_config is not None:
raise ValueError(
"lora_config should not be specified when `finetune_mode='full'`. To enable LoRA, set `finetune_mode='lora'`."
)
# Create a copy of the model to avoid modifying the original
config = deepcopy(self.model.config)
model = Chronos2Model(config).to(self.model.device) # type: ignore
model.load_state_dict(self.model.state_dict())
if finetune_mode == "lora":
if lora_config is None:
lora_config = LoraConfig(
r=8,
lora_alpha=16,
target_modules=[
"self_attention.q",
"self_attention.v",
"self_attention.k",
"self_attention.o",
"output_patch_embedding.output_layer",
],
)
elif isinstance(lora_config, dict):
lora_config = LoraConfig(**lora_config)
else:
assert isinstance(lora_config, LoraConfig), (
f"lora_config must be an instance of LoraConfig or a dict, got {type(lora_config)}"
)
model = get_peft_model(model, lora_config)
n_trainable_params, n_params = model.get_nb_trainable_parameters()
logger.info(
f"Using LoRA. Number of trainable parameters: {n_trainable_params}, total parameters: {n_params}."
)
if context_length is None:
context_length = self.model_context_length
@ -217,7 +272,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
report_to="none",
max_steps=num_steps,
gradient_accumulation_steps=1,
dataloader_num_workers=1,
dataloader_num_workers=0,
tf32=has_sm80 and not use_cpu,
bf16=has_sm80 and not use_cpu,
save_only_model=True,
@ -233,7 +288,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
)
eval_dataset = None
callbacks = []
callbacks = callbacks or []
if validation_inputs is not None:
# construct validation dataset
eval_dataset = Chronos2Dataset.convert_inputs(
@ -267,6 +322,11 @@ class Chronos2Pipeline(BaseChronosPipeline):
training_args = TrainingArguments(**training_kwargs)
if disable_data_parallel and not use_cpu:
# This is a hack to disable the default `transformers` behavior of using DataParallel
training_args._n_gpu = 1
assert training_args.n_gpu == 1 # Ensure that the hack worked
trainer = Chronos2Trainer(
model=model,
args=training_args,
@ -274,12 +334,19 @@ class Chronos2Pipeline(BaseChronosPipeline):
eval_dataset=eval_dataset,
callbacks=callbacks,
)
if remove_printer_callback:
trainer.pop_callback(PrinterCallback)
trainer.train()
# update max_output_patches, if the model was fine-tuned with longer prediction_length
# update context_length and max_output_patches, if the model was fine-tuned with larger values
model.chronos_config.context_length = max(model.chronos_config.context_length, context_length)
model.chronos_config.max_output_patches = max(
model.chronos_config.max_output_patches, math.ceil(prediction_length / self.model_output_patch_size)
)
# update chronos_config in model's config, so it is saved correctly
model.config.chronos_config = model.chronos_config.__dict__
# Create a new pipeline with the fine-tuned model
finetuned_pipeline = Chronos2Pipeline(model=model)
@ -388,7 +455,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
prediction_length: int | None = None,
batch_size: int = 256,
context_length: int | None = None,
predict_batches_jointly: bool = False,
cross_learning: bool = False,
limit_prediction_length: bool = False,
**kwargs,
) -> list[torch.Tensor]:
@ -474,7 +541,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
will be lower than this value, by default 256
context_length
The maximum context length used during for inference, by default set to the model's default context length
predict_batches_jointly
cross_learning
If True, cross-learning is enabled, i.e., all the tasks in `inputs` will be predicted jointly and the model will share information across all inputs, by default False
The following must be noted when using cross-learning:
- Cross-learning doesn't always improve forecast accuracy and must be tested for individual use cases.
@ -494,6 +561,14 @@ class Chronos2Pipeline(BaseChronosPipeline):
if prediction_length is None:
prediction_length = model_prediction_length
if kwargs.get("predict_batches_jointly") is not None:
warnings.warn(
"The `predict_batches_jointly` argument is deprecated and will be removed in a future version. "
"Please use `cross_learning=True` to enable the cross-learning mode.",
category=FutureWarning,
stacklevel=2,
)
cross_learning = kwargs.pop("predict_batches_jointly")
# The maximum number of output patches to generate in a single forward pass before the long-horizon heuristic kicks in. Note: A value larger
# than the model's default max_output_patches may lead to degradation in forecast accuracy, defaults to a model-specific value
max_output_patches = kwargs.pop("max_output_patches", self.max_output_patches)
@ -502,6 +577,8 @@ class Chronos2Pipeline(BaseChronosPipeline):
# effective batch size increases by a factor of `len(unrolled_quantiles)` when making long-horizon predictions,
# by default [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
unrolled_quantiles = kwargs.pop("unrolled_quantiles", [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
# A callback which is called after each batch has been processed
after_batch_callback: Callable = kwargs.pop("after_batch", lambda: None)
if len(kwargs) > 0:
raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}.")
@ -542,7 +619,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
mode=DatasetMode.TEST,
)
test_loader = DataLoader(
test_dataset, batch_size=None, num_workers=1, pin_memory=True, shuffle=False, drop_last=False
test_dataset, batch_size=None, pin_memory=self.model.device.type == "cuda", shuffle=False, drop_last=False
)
all_predictions: list[torch.Tensor] = []
@ -553,7 +630,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
batch_future_covariates = batch["future_covariates"]
batch_target_idx_ranges = batch["target_idx_ranges"]
if predict_batches_jointly:
if cross_learning:
batch_group_ids = torch.zeros_like(batch_group_ids)
batch_prediction = self._predict_batch(
@ -566,6 +643,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
target_idx_ranges=batch_target_idx_ranges,
)
all_predictions.extend(batch_prediction)
after_batch_callback()
return all_predictions
@ -744,6 +822,10 @@ class Chronos2Pipeline(BaseChronosPipeline):
prediction_length: int | None = None,
quantile_levels: list[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
batch_size: int = 256,
context_length: int | None = None,
cross_learning: bool = False,
validate_inputs: bool = True,
freq: str | None = None,
**predict_kwargs,
) -> "pd.DataFrame":
"""
@ -773,6 +855,24 @@ class Chronos2Pipeline(BaseChronosPipeline):
The batch size used for prediction. Note that the batch size here means the number of time series, including target(s) and covariates,
which are input into the model. If your data has multiple target and/or covariates, the effective number of time series tasks in a batch
will be lower than this value, by default 256
context_length
The maximum context length used during for inference, by default set to the model's default context length
cross_learning
If True, cross-learning is enabled, i.e., all the tasks in `inputs` will be predicted jointly and the model will share information across all inputs, by default False
The following must be noted when using cross-learning:
- Cross-learning doesn't always improve forecast accuracy and must be tested for individual use cases.
- Results become dependent on batch size. Very large batch sizes may not provide benefits as they deviate from the maximum group size used during pretraining.
For optimal results, consider using a batch size around 100 (as used in the Chronos-2 technical report).
- Cross-learning is most helpful when individual time series have limited historical context, as the model can leverage patterns from related series in the batch.
validate_inputs
[ADVANCED] When True (default), validates dataframes before prediction. Setting to False removes the
validation overhead, but may silently lead to wrong predictions if data is misformatted. When False, you
must ensure: (1) all dataframes are sorted by (id_column, timestamp_column); (2) future_df (if provided)
has the same item IDs as df with exactly prediction_length rows of future timestamps per item; (3) all
timestamps are regularly spaced (e.g., with hourly frequency).
freq
Frequency string for timestamp generation (e.g., "h", "D", "W"). Can only be used when
validate_inputs=False. When provided, skips frequency inference from the data.
**predict_kwargs
Additional arguments passed to predict_quantiles
@ -803,6 +903,8 @@ class Chronos2Pipeline(BaseChronosPipeline):
timestamp_column=timestamp_column,
target_columns=target,
prediction_length=prediction_length,
freq=freq,
validate_inputs=validate_inputs,
)
# Generate forecasts
@ -812,33 +914,37 @@ class Chronos2Pipeline(BaseChronosPipeline):
quantile_levels=quantile_levels,
limit_prediction_length=False,
batch_size=batch_size,
context_length=context_length,
cross_learning=cross_learning,
**predict_kwargs,
)
# since predict_df tasks are homogenous by input design, we can safely stack the list of tensors into a single tensor
quantiles_np = torch.stack(quantiles).numpy() # [n_tasks, n_variates, horizon, num_quantiles]
mean_np = torch.stack(mean).numpy() # [n_tasks, n_variates, horizon]
results_dfs = []
for i, (series_id, future_ts) in enumerate(prediction_timestamps.items()):
q_pred = quantiles_np[i] # (n_variates, prediction_length, len(quantile_levels))
point_pred = mean_np[i] # (n_variates, prediction_length)
n_tasks = len(prediction_timestamps)
n_variates = len(target)
for target_idx, target_col in enumerate(target):
series_forecast_data: dict[str | tuple[str, str], Any] = {
id_column: series_id,
timestamp_column: future_ts,
"target_name": target_col,
}
series_forecast_data["predictions"] = point_pred[target_idx]
for q_idx, q_level in enumerate(quantile_levels):
series_forecast_data[str(q_level)] = q_pred[target_idx, :, q_idx]
series_ids = list(prediction_timestamps.keys())
future_ts = list(prediction_timestamps.values())
results_dfs.append(pd.DataFrame(series_forecast_data))
data = {
id_column: np.repeat(series_ids, n_variates * prediction_length),
timestamp_column: np.concatenate([np.tile(ts, n_variates) for ts in future_ts]),
"target_name": np.tile(np.repeat(target, prediction_length), n_tasks),
"predictions": mean_np.ravel(),
}
predictions_df = pd.concat(results_dfs, ignore_index=True)
predictions_df.set_index(id_column, inplace=True)
predictions_df = predictions_df.loc[original_order]
predictions_df.reset_index(inplace=True)
quantiles_flat = quantiles_np.reshape(-1, len(quantile_levels))
for q_idx, q_level in enumerate(quantile_levels):
data[str(q_level)] = quantiles_flat[:, q_idx]
predictions_df = pd.DataFrame(data)
# If validate_inputs=False, the df is used as-is without sorting by item_id, no reordering required
if validate_inputs:
predictions_df.set_index(id_column, inplace=True)
predictions_df = predictions_df.loc[original_order]
predictions_df.reset_index(inplace=True)
return predictions_df
@ -973,11 +1079,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
finetune_kwargs["prediction_length"] = first_window.horizon
finetune_kwargs["batch_size"] = finetune_kwargs.get("batch_size", batch_size)
try:
pipeline = self.fit(inputs=inputs, **finetune_kwargs)
except Exception as e:
msg = f"Finetuning failed with error: {e}. Continuing with the pretrained model."
warnings.warn(msg, category=UserWarning, stacklevel=2)
pipeline = self.fit(inputs=inputs, **finetune_kwargs)
predictions_per_window = []
inference_time_s = 0.0
@ -994,6 +1096,86 @@ class Chronos2Pipeline(BaseChronosPipeline):
return predictions_per_window, inference_time_s
@torch.no_grad()
def embed(
self, inputs: TensorOrArray | Sequence[TensorOrArray], batch_size: int = 256, context_length: int | None = None
) -> tuple[list[torch.Tensor], list[tuple[torch.Tensor, torch.Tensor]]]:
"""
Get encoder embeddings for the given time series.
Parameters
----------
inputs
The time series to get embeddings for, can be one of:
- A 3-dimensional `torch.Tensor` or `np.ndarray` of shape (batch, n_variates, history_length). When `n_variates > 1`, information
will be shared among the different variates of each time series in the batch.
- A list of `torch.Tensor` or `np.ndarray` where each element can either be 1-dimensional of shape (history_length,)
or 2-dimensional of shape (n_variates, history_length). The history_lengths may be different across elements; left-padding
will be applied, if needed.
batch_size
The batch size used for generating embeddings. Note that the batch size here means the total number of time series which are input into the model.
If your data has multiple variates, the effective number of time series tasks in a batch will be lower than this value, by default 256
context_length
The maximum context length used during for inference, by default set to the model's default context length
Returns
-------
embeddings
a list of `torch.Tensor` where each element has shape (n_variates, num_patches + 2, d_model) and the number of elements are equal to the number
of target time series (univariate or multivariate) in the `inputs`. The extra +2 is due to embeddings of the [REG] token and a masked output patch token.
loc_scale
a list of tuples with the mean and standard deviation of each time series.
"""
if context_length is None:
context_length = self.model_context_length
if context_length > self.model_context_length:
warnings.warn(
f"The specified context_length {context_length} is greater than the model's default context length {self.model_context_length}. "
f"Resetting context_length to {self.model_context_length}."
)
context_length = self.model_context_length
test_dataset = Chronos2Dataset.convert_inputs(
inputs=inputs,
context_length=context_length,
prediction_length=0,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
mode=DatasetMode.TEST,
)
test_loader = DataLoader(
test_dataset,
batch_size=None,
num_workers=0,
pin_memory=self.model.device.type == "cuda",
shuffle=False,
drop_last=False,
)
all_embeds: list[torch.Tensor] = []
all_loc_scales: list[tuple[torch.Tensor, torch.Tensor]] = []
for batch in test_loader:
assert batch["future_target"] is None
batch_context = batch["context"]
batch_group_ids = batch["group_ids"]
batch_target_idx_ranges = batch["target_idx_ranges"]
encoder_outputs, (locs, scales), *_ = self.model.encode(
context=batch_context.to(device=self.model.device, dtype=torch.float32),
group_ids=batch_group_ids.to(self.model.device),
)
batch_embeds = [encoder_outputs[0][start:end].cpu() for (start, end) in batch_target_idx_ranges]
batch_loc_scales = list(
zip(
[locs[start:end].cpu() for (start, end) in batch_target_idx_ranges],
[scales[start:end].cpu() for (start, end) in batch_target_idx_ranges],
)
)
all_embeds.extend(batch_embeds)
all_loc_scales.extend(batch_loc_scales)
return all_embeds, all_loc_scales
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *args, **kwargs):
"""
@ -1001,9 +1183,25 @@ class Chronos2Pipeline(BaseChronosPipeline):
Supports the same arguments as ``AutoConfig`` and ``AutoModel`` from ``transformers``.
"""
# Check if the model is on S3 and cache it locally first
# NOTE: Only base models (not LoRA adapters) are supported via S3
if str(pretrained_model_name_or_path).startswith("s3://"):
return BaseChronosPipeline.from_pretrained(pretrained_model_name_or_path, *args, **kwargs)
# Check if the hub model_id or local path is a LoRA adapter
if find_adapter_config_file(pretrained_model_name_or_path) is not None:
if not is_peft_available():
raise ImportError(
f"The model at {pretrained_model_name_or_path} is a `peft` adaptor, but `peft` is not available. "
f"Please install `peft` with `pip install peft` to use this model. "
)
from peft import AutoPeftModel
model = AutoPeftModel.from_pretrained(pretrained_model_name_or_path, *args, **kwargs)
model = model.merge_and_unload()
return cls(model=model)
# Handle the case for the base model
config = AutoConfig.from_pretrained(pretrained_model_name_or_path, *args, **kwargs)
assert hasattr(config, "chronos_config"), "Not a Chronos config file"

View file

@ -3,6 +3,7 @@
# Authors: Abdul Fatir Ansari <ansarnd@amazon.com>
import warnings
from typing import TYPE_CHECKING, cast
from torch.utils.data import DataLoader, Dataset
@ -48,11 +49,16 @@ class Chronos2Trainer(Trainer):
train_dataset = cast("Chronos2Dataset", self.train_dataset)
assert train_dataset.batch_size == self.args.train_batch_size, (
f"The batch_size of the train_dataset ({train_dataset.batch_size}) does not match the batch_size "
f"in TrainingArguments ({self.args.train_batch_size}). If you're using a machine with multiple GPUs, "
f"ensure that only a single GPU is visible by setting the CUDA_VISIBLE_DEVICES environment variable."
)
if self.args.train_batch_size > train_dataset.batch_size:
warnings.warn(
f"The batch_size of the train_dataset ({train_dataset.batch_size}) does not match the batch_size "
f"in TrainingArguments ({self.args.train_batch_size}). On machines with multiple GPUs, this may indicate "
f"that multiple GPUs are visible and transformers is using DataParallel for training by default. "
f"This may lead to unnecessary slowdown and unexpected behavior. We strongly recommend setting the CUDA_VISIBLE_DEVICES "
f"environment variable to ensure that only a single GPU is visible.",
category=UserWarning,
stacklevel=3,
)
dataloader_params = {
# Disable automatic batching as we handle batching ourselves
@ -74,11 +80,16 @@ class Chronos2Trainer(Trainer):
eval_dataset = cast("Chronos2Dataset", self.eval_dataset)
assert eval_dataset.batch_size == self.args.eval_batch_size, (
f"The batch_size of the eval_dataset ({eval_dataset.batch_size}) does not match the batch_size "
f"in TrainingArguments ({self.args.eval_batch_size}). If you're using a machine with multiple GPUs, "
f"ensure that only a single GPU is visible by setting the CUDA_VISIBLE_DEVICES environment variable."
)
if self.args.eval_batch_size > eval_dataset.batch_size:
warnings.warn(
f"The batch_size of the eval_dataset ({eval_dataset.batch_size}) does not match the batch_size "
f"in TrainingArguments ({self.args.eval_batch_size}). On machines with multiple GPUs, this may indicate "
f"that multiple GPUs are visible and transformers is using DataParallel for training by default. "
f"This may lead to unnecessary slowdown and unexpected behavior. We strongly recommend setting the CUDA_VISIBLE_DEVICES "
f"environment variable to ensure that only a single GPU is visible.",
category=UserWarning,
stacklevel=3,
)
dataloader_params = {
# Disable automatic batching as we handle batching ourselves

View file

@ -408,6 +408,14 @@ class ChronosBoltPipeline(BaseChronosPipeline):
super().__init__(inner_model=model) # type: ignore
self.model = model
@property
def model_context_length(self) -> int:
return self.model.chronos_config.context_length
@property
def model_prediction_length(self) -> int:
return self.model.chronos_config.prediction_length
@property
def quantiles(self) -> List[float]:
return self.model.config.chronos_config["quantiles"]
@ -487,14 +495,12 @@ class ChronosBoltPipeline(BaseChronosPipeline):
"""
context_tensor = self._prepare_and_validate_context(context=inputs)
model_context_length: int = self.model.config.chronos_config["context_length"]
model_prediction_length: int = self.model.config.chronos_config["prediction_length"]
if prediction_length is None:
prediction_length = model_prediction_length
prediction_length = self.model_prediction_length
if prediction_length > model_prediction_length:
if prediction_length > self.model_prediction_length:
msg = (
f"We recommend keeping prediction length <= {model_prediction_length}. "
f"We recommend keeping prediction length <= {self.model_prediction_length}. "
"The quality of longer predictions may degrade since the model is not optimized for it. "
)
if limit_prediction_length:
@ -507,32 +513,46 @@ class ChronosBoltPipeline(BaseChronosPipeline):
# We truncate the context here because otherwise batches with very long
# context could take up large amounts of GPU memory unnecessarily.
if context_tensor.shape[-1] > model_context_length:
context_tensor = context_tensor[..., -model_context_length:]
if context_tensor.shape[-1] > self.model_context_length:
context_tensor = context_tensor[..., -self.model_context_length :]
# TODO: We unroll the forecast of Chronos Bolt greedily with the full forecast
# horizon that the model was trained with (i.e., 64). This results in variance collapsing
# every 64 steps.
context_tensor = context_tensor.to(
device=self.model.device,
dtype=torch.float32,
)
while remaining > 0:
with torch.no_grad():
prediction = self.model(
context=context_tensor,
).quantile_preds.to(context_tensor)
context_tensor = context_tensor.to(device=self.model.device, dtype=torch.float32)
# First block prediction
with torch.no_grad():
prediction: torch.Tensor = self.model(context=context_tensor).quantile_preds.to(context_tensor)
predictions.append(prediction)
remaining -= prediction.shape[-1]
if remaining <= 0:
break
# NOTE: The following heuristic for better prediction intervals with long-horizon forecasts
# uses all quantiles generated by the model for the first `model_prediction_length` steps,
# concatenating each quantile with the context and generating the next `model_prediction_length` steps.
# The `num_quantiles * num_quantiles` "samples" thus generated are then reduced to `num_quantiles`
# by computing empirical quantiles. Note that this option scales the batch size by `num_quantiles`
# when the `prediction_length` is greater than `model_prediction_length`.
central_idx = torch.abs(torch.tensor(self.quantiles) - 0.5).argmin()
central_prediction = prediction[:, central_idx]
if remaining > 0:
# Expand the context along quantile axis
context_tensor = context_tensor.unsqueeze(1).repeat(1, len(self.quantiles), 1)
context_tensor = torch.cat([context_tensor, central_prediction], dim=-1)
quantile_tensor = torch.tensor(self.quantiles, device=context_tensor.device)
while remaining > 0:
# Append the prediction to context
context_tensor = torch.cat([context_tensor, prediction], dim=-1)[..., -self.model_context_length :]
(batch_size, n_quantiles, context_length) = context_tensor.shape
with torch.no_grad():
# Reshape (batch, n_quantiles, context_length) -> (batch * n_quantiles, context_length)
prediction = self.model(
context=context_tensor.reshape(batch_size * n_quantiles, context_length)
).quantile_preds.to(context_tensor)
# Reshape predictions from (batch * n_quantiles, n_quantiles, model_prediction_length) to (batch, n_quantiles * n_quantiles, model_prediction_length)
prediction = prediction.reshape(batch_size, n_quantiles * n_quantiles, -1)
# Reduce `n_quantiles * n_quantiles` to n_quantiles and transpose back to (batch_size, n_quantiles, model_prediction_length)
prediction = torch.quantile(prediction, q=quantile_tensor, dim=1).transpose(0, 1)
predictions.append(prediction)
remaining -= prediction.shape[-1]
return torch.cat(predictions, dim=-1)[..., :prediction_length].to(dtype=torch.float32, device="cpu")

341
src/chronos/df_utils.py Normal file
View file

@ -0,0 +1,341 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
# Authors: Abdul Fatir Ansari <ansarnd@amazon.com>
import warnings
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
import pandas as pd
def _validate_df_types_and_cast(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
id_column: str = "item_id",
timestamp_column: str = "timestamp",
) -> tuple["pd.DataFrame", "pd.DataFrame | None"]:
import pandas as pd
astype_dict = {}
future_astype_dict = {}
for col in df.columns.drop([id_column, timestamp_column]):
col_dtype = df[col].dtype
if col in target_columns and not pd.api.types.is_numeric_dtype(df[col]):
raise ValueError(f"All target columns must be numeric but got {col=} with dtype={col_dtype}")
if (
pd.api.types.is_object_dtype(df[col])
or pd.api.types.is_string_dtype(df[col])
or isinstance(col_dtype, pd.CategoricalDtype)
):
astype_dict[col] = "category"
elif pd.api.types.is_numeric_dtype(df[col]) or pd.api.types.is_bool_dtype(df[col]):
astype_dict[col] = "float32"
else:
raise ValueError(
f"All columns must contain numeric, object, category, string, or bool dtype but got {col=} with dtype={col_dtype}"
)
if future_df is not None and col in future_df.columns:
if future_df[col].dtype != col_dtype:
raise ValueError(
f"Column {col} in future_df has dtype {future_df[col].dtype} but column in df has dtype {col_dtype}"
)
future_astype_dict[col] = astype_dict[col]
df = df.astype(astype_dict, copy=True)
if future_df is not None:
future_df = future_df.astype(future_astype_dict, copy=True)
return df, future_df
def validate_df_inputs(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
prediction_length: int,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
) -> tuple["pd.DataFrame", "pd.DataFrame | None", str, list[int], np.ndarray]:
"""
Validates and prepares dataframe inputs
Parameters
----------
df
Input dataframe containing time series data with columns:
- id_column: Identifier for each time series
- timestamp_column: Timestamps for each observation
- target_columns: One or more target variables to forecast
- Additional columns are treated as covariates
future_df
Optional dataframe containing future covariate values with columns:
- id_column: Identifier for each time series
- timestamp_column: Future timestamps
- Subset of covariate columns from df
target_columns
Names of target columns to forecast
prediction_length
Number of future time steps to predict
id_column
Name of column containing time series identifiers
timestamp_column
Name of column containing timestamps
Returns
-------
A tuple containing:
- Validated and sorted input dataframe
- Validated and sorted future dataframe (if provided)
- Inferred frequency of the time series
- List of series lengths from input dataframe
- Original order of time series IDs
Raises
------
ValueError
If validation fails for:
- Missing required columns
- Invalid data types
- Inconsistent frequencies
- Insufficient data points
- Mismatched series between df and future_df
- Invalid future_df lengths
"""
import pandas as pd
required_cols = [id_column, timestamp_column] + target_columns
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"df does not contain all expected columns. Missing columns: {missing_cols}")
if future_df is not None:
future_required_cols = [id_column, timestamp_column]
missing_future_cols = [col for col in future_required_cols if col not in future_df.columns]
targets_in_future = [col for col in future_df.columns if col in target_columns]
extra_future_cols = [col for col in future_df.columns if col not in df.columns]
if missing_future_cols:
raise ValueError(
f"future_df does not contain all expected columns. Missing columns: {missing_future_cols}"
)
if targets_in_future:
raise ValueError(
f"future_df cannot contain target columns. Target columns found in future_df: {targets_in_future}"
)
if extra_future_cols:
raise ValueError(f"future_df cannot contain columns not present in df. Extra columns: {extra_future_cols}")
df, future_df = _validate_df_types_and_cast(
df, future_df, id_column=id_column, timestamp_column=timestamp_column, target_columns=target_columns
)
# Get the original order of time series IDs
original_order = df[id_column].unique()
# Sort and prepare df
df[timestamp_column] = pd.to_datetime(df[timestamp_column])
df = df.sort_values([id_column, timestamp_column])
# Get series lengths
series_lengths = df[id_column].value_counts(sort=False).to_list()
def validate_freq(timestamps: pd.DatetimeIndex, series_id: str):
freq = pd.infer_freq(timestamps)
if not freq:
raise ValueError(f"Could not infer frequency for series {series_id}")
return freq
# Validate each series
all_freqs = []
start_idx = 0
timestamp_index = pd.DatetimeIndex(df[timestamp_column])
for length in series_lengths:
if length < 3:
series_id = df[id_column].iloc[start_idx]
raise ValueError(
f"Every time series must have at least 3 data points, found {length=} for series {series_id}"
)
timestamps = timestamp_index[start_idx : start_idx + length]
series_id = df[id_column].iloc[start_idx]
all_freqs.append(validate_freq(timestamps, series_id))
start_idx += length
if len(set(all_freqs)) > 1:
raise ValueError("All time series must have the same frequency")
inferred_freq = all_freqs[0]
# Sort future_df if provided and validate its series lengths
future_series_lengths = None
if future_df is not None:
future_df[timestamp_column] = pd.to_datetime(future_df[timestamp_column])
future_df = future_df.sort_values([id_column, timestamp_column])
# Validate that future_df contains all series from df
context_ids = set(df[id_column].unique())
future_ids = set(future_df[id_column].unique())
if context_ids != future_ids:
raise ValueError("future_df must contain the same time series IDs as df")
future_series_lengths = future_df[id_column].value_counts(sort=False)
if (future_series_lengths != prediction_length).any():
invalid_series = future_series_lengths[future_series_lengths != prediction_length]
raise ValueError(
f"future_df must contain {prediction_length=} values for each series, "
f"but found series with different lengths: {invalid_series.to_dict()}"
)
return df, future_df, inferred_freq, series_lengths, original_order
def convert_df_input_to_list_of_dicts_input(
df: "pd.DataFrame",
future_df: "pd.DataFrame | None",
target_columns: list[str],
prediction_length: int,
id_column: str = "item_id",
timestamp_column: str = "timestamp",
validate_inputs: bool = True,
freq: str | None = None,
) -> tuple[list[dict[str, np.ndarray | dict[str, np.ndarray]]], np.ndarray, dict[str, "pd.DatetimeIndex"]]:
"""
Convert from dataframe input format to a list of dictionaries input format.
Parameters
----------
df
Input dataframe containing time series data with columns:
- id_column: Identifier for each time series
- timestamp_column: Timestamps for each observation
- target_columns: One or more target variables to forecast
- Additional columns are treated as covariates
future_df
Optional dataframe containing future covariate values with columns:
- id_column: Identifier for each time series
- timestamp_column: Future timestamps
- Subset of covariate columns from df
target_columns
Names of target columns to forecast
prediction_length
Number of future time steps to predict
id_column
Name of column containing time series identifiers
timestamp_column
Name of column containing timestamps
validate_inputs
[ADVANCED] When True (default), validates dataframes before prediction. Setting to False removes the
validation overhead, but may silently lead to wrong predictions if data is misformatted. When False, you
must ensure: (1) all dataframes are sorted by (id_column, timestamp_column); (2) future_df (if provided)
has the same item IDs as df with exactly prediction_length rows of future timestamps per item; (3) all
timestamps are regularly spaced (e.g., with hourly frequency).
freq
Frequency string for timestamp generation (e.g., "h", "D", "W"). Can only be used
when validate_inputs=False. When provided, skips frequency inference from the data.
Returns
-------
A tuple containing:
- Time series converted to list of dictionaries format
- Original order of time series IDs
- Dictionary mapping series IDs to future time index
"""
import pandas as pd
if freq is not None and validate_inputs:
raise ValueError(
"freq can only be provided when validate_inputs=False. "
"When using freq with validate_inputs=False, you must ensure: "
"(1) all dataframes are sorted by (id_column, timestamp_column); "
"(2) future_df (if provided) has the same item IDs as df with exactly "
"prediction_length rows of future timestamps per item; "
"(3) all timestamps are regularly spaced."
)
if validate_inputs:
df, future_df, freq, series_lengths, original_order = validate_df_inputs(
df,
future_df=future_df,
id_column=id_column,
timestamp_column=timestamp_column,
target_columns=target_columns,
prediction_length=prediction_length,
)
else:
# Get the original order of time series IDs
original_order = df[id_column].unique()
# Get series lengths
series_lengths = df[id_column].value_counts(sort=False).to_list()
# If freq is not provided, infer from the first series with >= 3 points
if freq is None:
timestamp_index = pd.DatetimeIndex(df[timestamp_column])
start_idx = 0
for length in series_lengths:
if length < 3:
start_idx += length
continue
timestamps = timestamp_index[start_idx : start_idx + length]
freq = pd.infer_freq(timestamps)
break
assert freq is not None, "validate_inputs is False, but could not infer frequency from the dataframe"
# Convert to list of dicts format
inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = []
prediction_timestamps: dict[str, pd.DatetimeIndex] = {}
indptr = np.concatenate([[0], np.cumsum(series_lengths)]).astype("int64")
target_array = df[target_columns].to_numpy().T # Shape: (n_targets, len(df))
last_ts = pd.DatetimeIndex(df[timestamp_column].iloc[indptr[1:] - 1]) # Shape: (n_series,)
offset = pd.tseries.frequencies.to_offset(freq)
with warnings.catch_warnings():
# Silence PerformanceWarning for non-vectorized offsets https://github.com/pandas-dev/pandas/blob/95624ca2e99b0/pandas/core/arrays/datetimes.py#L822
warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)
# Generate all prediction timestamps at once by stacking offsets into shape (n_series * prediction_length)
prediction_timestamps_array = pd.DatetimeIndex(
np.dstack([last_ts + step * offset for step in range(1, prediction_length + 1)]).ravel()
)
past_covariates_dict = {
col: df[col].to_numpy() for col in df.columns if col not in [id_column, timestamp_column] + target_columns
}
future_covariates_dict = {}
if future_df is not None:
for col in future_df.columns.drop([id_column, timestamp_column]):
future_covariates_dict[col] = future_df[col].to_numpy()
if validate_inputs:
if (pd.DatetimeIndex(future_df[timestamp_column]) != pd.DatetimeIndex(prediction_timestamps_array)).any():
raise ValueError(
"future_df timestamps do not match the expected prediction timestamps. "
"You can disable this check by setting `validate_inputs=False`"
)
for i in range(len(series_lengths)):
start_idx, end_idx = indptr[i], indptr[i + 1]
future_start_idx, future_end_idx = i * prediction_length, (i + 1) * prediction_length
series_id = df[id_column].iloc[start_idx]
prediction_timestamps[series_id] = prediction_timestamps_array[future_start_idx:future_end_idx]
task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_array[:, start_idx:end_idx]}
if len(past_covariates_dict) > 0:
task["past_covariates"] = {col: values[start_idx:end_idx] for col, values in past_covariates_dict.items()}
if len(future_covariates_dict) > 0:
task["future_covariates"] = {
col: values[future_start_idx:future_end_idx] for col, values in future_covariates_dict.items()
}
inputs.append(task)
assert len(inputs) == len(series_lengths)
return inputs, original_order, prediction_timestamps

View file

@ -0,0 +1,35 @@
{
"alpha_pattern": {},
"auto_mapping": {
"base_model_class": "Chronos2Model",
"parent_library": "chronos.chronos2.model"
},
"base_model_name_or_path": "test/dummy-chronos2-model",
"bias": "none",
"fan_in_fan_out": false,
"inference_mode": true,
"init_lora_weights": true,
"layer_replication": null,
"layers_pattern": null,
"layers_to_transform": null,
"loftq_config": {},
"lora_alpha": 16,
"lora_dropout": 0.0,
"megatron_config": null,
"megatron_core": "megatron.core",
"modules_to_save": null,
"peft_type": "LORA",
"r": 8,
"rank_pattern": {},
"revision": null,
"target_modules": [
"self_attention.q",
"self_attention.k",
"self_attention.o",
"output_patch_embedding.output_layer",
"self_attention.v"
],
"task_type": null,
"use_dora": false,
"use_rslora": false
}

Binary file not shown.

View file

@ -3,6 +3,8 @@
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
import torch
@ -12,7 +14,14 @@ from chronos import (
ChronosPipeline,
MeanScaleUniformBins,
)
from test.util import validate_tensor
from test.util import create_df, get_forecast_start_times, validate_tensor
DUMMY_MODEL_PATH = Path(__file__).parent / "dummy-chronos-model"
@pytest.fixture
def pipeline() -> ChronosPipeline:
return BaseChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu")
def test_base_chronos_pipeline_loads_from_huggingface():
@ -167,11 +176,7 @@ def test_tokenizer_random_data(use_eos_token: bool):
@pytest.mark.parametrize("model_dtype", [torch.float32, torch.bfloat16])
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_predict(model_dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = ChronosPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-model",
device_map="cpu",
torch_dtype=model_dtype,
)
pipeline = ChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=model_dtype)
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
@ -238,11 +243,7 @@ def test_pipeline_predict_quantiles(
prediction_length: int,
quantile_levels: list[int],
):
pipeline = ChronosPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-model",
device_map="cpu",
torch_dtype=model_dtype,
)
pipeline = ChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=model_dtype)
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
@ -284,11 +285,7 @@ def test_pipeline_predict_quantiles(
@pytest.mark.parametrize("model_dtype", [torch.float32, torch.bfloat16])
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_embed(model_dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = ChronosPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-model",
device_map="cpu",
torch_dtype=model_dtype,
)
pipeline = ChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=model_dtype)
d_model = pipeline.model.model.config.d_model
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
@ -312,6 +309,88 @@ def test_pipeline_embed(model_dtype: torch.dtype, input_dtype: torch.dtype):
validate_tensor(scale, shape=(1,), dtype=torch.float32)
@pytest.mark.parametrize(
"context_setup, expected_rows",
[
# Targets only
({}, 6), # 2 series * 3 predictions
# Different context lengths
(
{"series_ids": ["X", "Y", "Z"], "n_points": [10, 17, 56], "target_cols": ["custom_target"]},
9,
), # 3 series * 3 predictions
],
)
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
def test_predict_df_works_for_valid_inputs(pipeline, context_setup, expected_rows, freq):
prediction_length = 3
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
n_series = len(series_ids)
n_targets = len(target_columns)
result = pipeline.predict_df(df, target=target_columns[0], prediction_length=prediction_length)
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
)
assert "target_name" in result.columns and np.all(
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
)
assert "timestamp" in result.columns and np.all(
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
)
assert "predictions" in result.columns
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline):
df = create_df()
# Make timestamps non-uniform for series A
df.loc[df["item_id"] == "A", "timestamp"] = [
"2023-01-01",
"2023-01-02",
"2023-01-04",
"2023-01-05",
"2023-01-06",
"2023-01-07",
"2023-01-08",
"2023-01-09",
"2023-01-10",
"2023-01-11",
]
with pytest.raises(ValueError, match="not infer frequency"):
pipeline.predict_df(df)
def test_predict_df_with_inconsistent_frequencies_raises_error(pipeline):
df = pd.DataFrame(
{
"item_id": ["A", "A", "A", "A", "A", "B", "B", "B", "B", "B"],
"timestamp": [
"2023-01-01",
"2023-01-02",
"2023-01-03",
"2023-01-04",
"2023-01-05",
"2023-01-01",
"2023-02-01",
"2023-03-01",
"2023-04-01",
"2023-05-01",
],
"target": [1.0] * 10,
}
)
with pytest.raises(ValueError, match="same frequency"):
pipeline.predict_df(df)
@pytest.mark.parametrize("n_tokens", [10, 1000, 10000])
def test_tokenizer_number_of_buckets(n_tokens):
config = ChronosConfig(

View file

@ -13,11 +13,10 @@ import pytest
import torch
from chronos import BaseChronosPipeline, Chronos2Pipeline
from chronos.chronos2.dataset import convert_df_input_to_list_of_dicts_input
from chronos.chronos2.config import Chronos2CoreConfig
from chronos.chronos2.layers import MHA
from test.util import validate_tensor
from chronos.df_utils import convert_df_input_to_list_of_dicts_input
from test.util import create_df, create_future_df, get_forecast_start_times, validate_tensor, timeout_callback
DUMMY_MODEL_PATH = Path(__file__).parent / "dummy-chronos2-model"
@ -35,6 +34,14 @@ def test_base_chronos2_pipeline_loads_from_s3():
BaseChronosPipeline.from_pretrained("s3://autogluon/chronos-2", device_map="cpu")
def test_base_chronos2_pipeline_loads_from_hf():
BaseChronosPipeline.from_pretrained("amazon/chronos-2", device_map="cpu")
def test_chronos2_lora_pipeline_loads_from_disk():
Chronos2Pipeline.from_pretrained(Path(__file__).parent / "dummy-chronos2-lora", device_map="cpu")
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
@ -324,7 +331,7 @@ def test_when_input_is_invalid_then_predict_raises_value_error(pipeline, inputs,
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_predict_can_handle_different_model_and_input_dtypes(dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = BaseChronosPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos2-model", device_map="cpu", dtype=dtype
Path(__file__).parent / "dummy-chronos2-model", device_map="cpu", torch_dtype=dtype
)
context = 10 * torch.rand(size=(4, 3, 16)) + 10
context = context.to(dtype=input_dtype)
@ -337,6 +344,35 @@ def test_pipeline_predict_can_handle_different_model_and_input_dtypes(dtype: tor
validate_tensor(quantiles_item, (3, expected_num_quantiles, 7), dtype=torch.float32)
@pytest.mark.parametrize(
"inputs, expected_output_shapes",
[
# NOTE: d_model for the dummy model is 6
# Homogenous univariate task
(torch.rand(4, 1, 16), [(1, 3, 6)] * 4),
# Homogenous multivariate task
(torch.rand(4, 3, 37), [(3, 5, 6)] * 4),
# Heterogenous tasks with different history lengths
(
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
[(1, 12, 6), (2, 12, 6), (1, 12, 6)],
),
],
)
def test_when_input_is_valid_then_pipeline_can_embed(pipeline, inputs, expected_output_shapes):
embeds, loc_scales = pipeline.embed(inputs)
assert (
isinstance(embeds, list)
and len(embeds) == len(expected_output_shapes)
and len(loc_scales) == len(expected_output_shapes)
)
for embed, loc_scale, expected_shape in zip(embeds, loc_scales, expected_output_shapes):
validate_tensor(embed, expected_shape, dtype=torch.float32)
validate_tensor(loc_scale[0], (expected_shape[0], 1), dtype=torch.float32)
validate_tensor(loc_scale[1], (expected_shape[0], 1), dtype=torch.float32)
@pytest.mark.parametrize(
"task_kwargs",
[
@ -384,81 +420,54 @@ def test_pipeline_can_evaluate_on_dummy_fev_task(pipeline, task_kwargs):
assert isinstance(eval_summary["test_error"], float)
def create_df(series_ids=["A", "B"], n_points=[10, 10], target_cols=["target"], covariates=None, freq="h"):
"""Helper to create test context DataFrames."""
series_dfs = []
for series_id, length in zip(series_ids, n_points):
series_data = {"item_id": series_id, "timestamp": pd.date_range(end="2001-10-01", periods=length, freq=freq)}
for target_col in target_cols:
series_data[target_col] = np.random.randn(length)
if covariates:
for cov in covariates:
series_data[cov] = np.random.randn(length)
series_dfs.append(pd.DataFrame(series_data))
return pd.concat(series_dfs, ignore_index=True)
def create_future_df(forecast_start_times: list, series_ids=["A", "B"], n_points=[5, 5], covariates=None, freq="h"):
"""Helper to create test future DataFrames."""
series_dfs = []
for series_id, length, start in zip(series_ids, n_points, forecast_start_times):
series_data = {"item_id": series_id, "timestamp": pd.date_range(start=start, periods=length, freq=freq)}
if covariates:
for cov in covariates:
series_data[cov] = np.random.randn(length)
series_dfs.append(pd.DataFrame(series_data))
return pd.concat(series_dfs, ignore_index=True)
def get_forecast_start_times(df, freq="h"):
context_end_times = df.groupby("item_id")["timestamp"].max()
forecast_start_times = [pd.date_range(end_time, periods=2, freq=freq)[-1] for end_time in context_end_times]
return forecast_start_times
@pytest.mark.parametrize(
"context_setup, future_setup, expected_rows",
"context_setup, future_setup",
[
# Targets only
({}, None, 6), # 2 series * 3 predictions
({}, None),
# Multiple targets with different context lengths
(
{"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]},
None,
18,
), # 2 series * 3 targets * 3 predictions
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None, 6),
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"], "n_points": [3, 3]}, 6),
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"], "n_points": [3, 3]}, 6),
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{
"series_ids": ["B", "C", "A", "Z"],
"covariates": ["cov1"],
"n_points": [3, 3, 3, 3],
},
12,
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
def test_predict_df_works_for_valid_inputs(pipeline, context_setup, future_setup, expected_rows, freq):
prediction_length = 3
@pytest.mark.parametrize("prediction_length", [1, 4])
@pytest.mark.parametrize("validate_inputs", [True, False])
def test_predict_df_works_for_valid_inputs(
pipeline, context_setup, future_setup, freq, validate_inputs, prediction_length
):
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
future_df = create_future_df(forecast_start_times, **future_setup, freq=freq) if future_setup else None
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
n_series = len(series_ids)
n_targets = len(target_columns)
result = pipeline.predict_df(df, future_df=future_df, target=target_columns, prediction_length=prediction_length)
result = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
validate_inputs=validate_inputs,
)
expected_rows = n_series * n_targets * prediction_length
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
@ -513,9 +522,10 @@ def test_predict_df_future_df_validation_errors(pipeline, future_data, error_mat
pipeline.predict_df(df, future_df=future_df)
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline):
@pytest.mark.parametrize("validate_inputs", [True, False])
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline, validate_inputs):
df = create_df()
# Make timestamps non-uniform for series A
# Make timestamps non-uniform for series A (first series)
df.loc[df["item_id"] == "A", "timestamp"] = [
"2023-01-01",
"2023-01-02",
@ -529,8 +539,8 @@ def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline):
"2023-01-11",
]
with pytest.raises(ValueError, match="not infer frequency"):
pipeline.predict_df(df)
with pytest.raises((ValueError, AssertionError), match="not infer frequency"):
pipeline.predict_df(df, validate_inputs=validate_inputs)
def test_predict_df_with_inconsistent_frequencies_raises_error(pipeline):
@ -567,26 +577,80 @@ def test_predict_df_with_future_df_missing_series_raises_error(pipeline):
pipeline.predict_df(df, future_df=future_df)
def test_predict_df_with_future_df_with_different_lengths_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 7], covariates=["cov1"]
)
with pytest.raises(ValueError, match="all time series must have length"):
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
def test_predict_df_with_future_df_with_different_freq_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"], freq="h")
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 3], covariates=["cov1"], freq="D"
)
with pytest.raises(ValueError, match="must have the same frequency as context"):
with pytest.raises(ValueError, match="future_df timestamps do not match"):
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
def test_predict_df_with_future_df_with_different_lengths_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 7], covariates=["cov1"]
)
with pytest.raises(ValueError, match="future_df must contain prediction"):
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
@pytest.mark.parametrize(
"context_setup, future_setup",
[
# Targets only
({}, None),
# Multiple targets with different context lengths
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("prediction_length", [1, 4])
def test_predict_df_outputs_different_results_with_cross_learning_enabled(
pipeline, context_setup, future_setup, prediction_length
):
freq = "h"
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
result_with_cross_learning = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
cross_learning=True,
)
result_without_cross_learning = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
cross_learning=False,
)
assert not np.array_equal(result_with_cross_learning["predictions"], result_without_cross_learning["predictions"])
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
@ -672,12 +736,20 @@ def test_predict_df_with_future_df_with_different_freq_raises_error(pipeline):
),
],
)
@pytest.mark.parametrize("finetune_mode", ["full", "lora"])
def test_when_input_is_valid_then_pipeline_can_be_finetuned(
pipeline, inputs, prediction_length, expected_output_shapes
pipeline, inputs, prediction_length, expected_output_shapes, finetune_mode
):
# Get outputs before fine-tuning
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
ft_pipeline = pipeline.fit(inputs, prediction_length=prediction_length, num_steps=5, min_past=1, batch_size=32)
ft_pipeline = pipeline.fit(
inputs,
prediction_length=prediction_length,
num_steps=5,
min_past=1,
batch_size=32,
finetune_mode=finetune_mode,
)
# Get outputs from fine-tuned pipeline
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
# Get outputs from original pipeline after fine-tuning
@ -853,40 +925,36 @@ def test_when_input_time_series_are_too_short_then_finetuning_raises_error(pipel
@pytest.mark.parametrize(
"context_setup, future_setup, expected_rows",
"context_setup, future_setup",
[
# Targets only
({}, None, 6), # 2 series * 3 predictions
({}, None),
# Multiple targets with different context lengths
(
{"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]},
None,
18,
), # 2 series * 3 targets * 3 predictions
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None, 6),
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"], "n_points": [3, 3]}, 6),
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"], "n_points": [3, 3]}, 6),
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{
"series_ids": ["B", "C", "A", "Z"],
"covariates": ["cov1"],
"n_points": [3, 3, 3, 3],
},
12,
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("freq", ["h", "D", "ME"])
def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future_setup, expected_rows, freq):
def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future_setup, freq):
prediction_length = 3
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
future_df = create_future_df(forecast_start_times, **future_setup, freq=freq) if future_setup else None
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
@ -919,6 +987,7 @@ def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future
)
# Check predictions from the fine-tuned model are valid
expected_rows = n_series * n_targets * prediction_length
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
@ -939,6 +1008,17 @@ def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future
assert not np.allclose(orig_result_before["predictions"].to_numpy(), result["predictions"].to_numpy())
def test_when_predict_df_called_with_timeout_callback_then_timeout_error_is_raised(pipeline):
num_series = 1000
large_df = create_df(series_ids=[j for j in range(num_series)], n_points=[2048] * num_series)
with pytest.raises(TimeoutError, match="time limit exceeded"):
pipeline.predict_df(
large_df,
prediction_length=48,
after_batch=timeout_callback(0.1),
)
@pytest.mark.parametrize("attn_implementation", ["eager", "sdpa"])
def test_pipeline_works_with_different_attention_implementations(attn_implementation):
"""Test that the pipeline works with different attention implementations."""
@ -1015,13 +1095,13 @@ def test_eager_and_sdpa_produce_identical_outputs(pipeline):
# Reload pipeline with SDPA
model_path = Path(__file__).parent / "dummy-chronos2-model"
pipeline_sdpa = BaseChronosPipeline.from_pretrained(
model_path, device_map="cpu", attn_implementation="sdpa", dtype=torch.float32
model_path, device_map="cpu", attn_implementation="sdpa", torch_dtype=torch.float32
)
# Note: the original pipeline fixture uses default attn_implementation which should be sdpa
# Force eager for comparison
pipeline_eager = BaseChronosPipeline.from_pretrained(
model_path, device_map="cpu", attn_implementation="eager", dtype=torch.float32
model_path, device_map="cpu", attn_implementation="eager", torch_dtype=torch.float32
)
# Test 1: Simple univariate input

View file

@ -5,12 +5,21 @@ from pathlib import Path
import datasets
import fev
import numpy as np
import pandas as pd
import pytest
import torch
from chronos import BaseChronosPipeline, ChronosBoltPipeline
from chronos.chronos_bolt import InstanceNorm, Patch
from test.util import validate_tensor
from test.util import create_df, get_forecast_start_times, validate_tensor
DUMMY_MODEL_PATH = Path(__file__).parent / "dummy-chronos-bolt-model"
@pytest.fixture
def pipeline() -> ChronosBoltPipeline:
return BaseChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu")
def test_base_chronos_pipeline_loads_from_huggingface():
@ -20,11 +29,7 @@ def test_base_chronos_pipeline_loads_from_huggingface():
@pytest.mark.parametrize("torch_dtype", [torch.float32, torch.bfloat16])
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_predict(torch_dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = ChronosBoltPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-bolt-model",
device_map="cpu",
torch_dtype=torch_dtype,
)
pipeline = ChronosBoltPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=torch_dtype)
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
expected_num_quantiles = len(pipeline.quantiles)
@ -84,11 +89,7 @@ def test_pipeline_predict_quantiles(
prediction_length: int,
quantile_levels: list[float],
):
pipeline = ChronosBoltPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-bolt-model",
device_map="cpu",
torch_dtype=torch_dtype,
)
pipeline = ChronosBoltPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=torch_dtype)
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
@ -127,11 +128,7 @@ def test_pipeline_predict_quantiles(
@pytest.mark.parametrize("model_dtype", [torch.float32, torch.bfloat16])
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_embed(model_dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = ChronosBoltPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos-bolt-model",
device_map="cpu",
torch_dtype=model_dtype,
)
pipeline = ChronosBoltPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu", torch_dtype=model_dtype)
d_model = pipeline.model.config.d_model
context = 10 * torch.rand(size=(4, 16)) + 10
context = context.to(dtype=input_dtype)
@ -160,6 +157,88 @@ def test_pipeline_embed(model_dtype: torch.dtype, input_dtype: torch.dtype):
validate_tensor(loc_scale[1], shape=(1,), dtype=torch.float32)
@pytest.mark.parametrize(
"context_setup, expected_rows",
[
# Targets only
({}, 6), # 2 series * 3 predictions
# Different context lengths
(
{"series_ids": ["X", "Y", "Z"], "n_points": [10, 17, 56], "target_cols": ["custom_target"]},
9,
), # 3 series * 3 predictions
],
)
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
def test_predict_df_works_for_valid_inputs(pipeline, context_setup, expected_rows, freq):
prediction_length = 3
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
n_series = len(series_ids)
n_targets = len(target_columns)
result = pipeline.predict_df(df, target=target_columns[0], prediction_length=prediction_length)
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
)
assert "target_name" in result.columns and np.all(
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
)
assert "timestamp" in result.columns and np.all(
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
)
assert "predictions" in result.columns
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline):
df = create_df()
# Make timestamps non-uniform for series A
df.loc[df["item_id"] == "A", "timestamp"] = [
"2023-01-01",
"2023-01-02",
"2023-01-04",
"2023-01-05",
"2023-01-06",
"2023-01-07",
"2023-01-08",
"2023-01-09",
"2023-01-10",
"2023-01-11",
]
with pytest.raises(ValueError, match="not infer frequency"):
pipeline.predict_df(df)
def test_predict_df_with_inconsistent_frequencies_raises_error(pipeline):
df = pd.DataFrame(
{
"item_id": ["A", "A", "A", "A", "A", "B", "B", "B", "B", "B"],
"timestamp": [
"2023-01-01",
"2023-01-02",
"2023-01-03",
"2023-01-04",
"2023-01-05",
"2023-01-01",
"2023-02-01",
"2023-03-01",
"2023-04-01",
"2023-05-01",
],
"target": [1.0] * 10,
}
)
with pytest.raises(ValueError, match="same frequency"):
pipeline.predict_df(df)
# The following tests have been taken from
# https://github.com/autogluon/autogluon/blob/f57beb26cb769c6e0d484a6af2b89eab8aee73a8/timeseries/tests/unittests/models/chronos/pipeline/test_chronos_bolt.py
# Author: Caner Turkmen <atturkm@amazon.com>

462
test/test_df_utils.py Normal file
View file

@ -0,0 +1,462 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from chronos.df_utils import (
convert_df_input_to_list_of_dicts_input,
validate_df_inputs,
)
from test.util import create_df, create_future_df, get_forecast_start_times
# Tests for validate_df_inputs function
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
def test_validate_df_inputs_returns_correct_metadata_for_valid_inputs(freq):
"""Test that function returns validated dataframes, frequency, series lengths, and original order."""
# Create test data with 2 series
df = create_df(series_ids=["A", "B"], n_points=[10, 15], target_cols=["target"], freq=freq)
# Call validate_df_inputs
validated_df, validated_future_df, inferred_freq, series_lengths, original_order = validate_df_inputs(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
id_column="item_id",
timestamp_column="timestamp",
)
# Verify key return values
assert validated_future_df is None
assert inferred_freq is not None
assert series_lengths == [10, 15]
assert list(original_order) == ["A", "B"]
# Verify dataframe is sorted
assert validated_df["item_id"].iloc[0] == "A"
assert validated_df["item_id"].iloc[10] == "B"
def test_validate_df_inputs_casts_mixed_dtypes_correctly():
"""Test that numeric columns are cast to float32 and categorical/string/object columns are cast to category."""
# Create dataframe with mixed column types
df = pd.DataFrame(
{
"item_id": ["A"] * 10,
"timestamp": pd.date_range(end="2001-10-01", periods=10, freq="h"),
"target": np.random.randn(10), # numeric
"numeric_cov": np.random.randint(0, 10, 10), # integer numeric
"string_cov": ["cat1"] * 5 + ["cat2"] * 5, # string
"bool_cov": [True, False] * 5, # boolean
}
)
# Call validate_df_inputs
validated_df, _, _, _, _ = validate_df_inputs(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
)
# Verify dtypes after validation
assert validated_df["target"].dtype == np.float32
assert validated_df["numeric_cov"].dtype == np.float32
assert validated_df["string_cov"].dtype.name == "category"
assert validated_df["bool_cov"].dtype == np.float32 # booleans are cast to float32
def test_validate_df_inputs_raises_error_when_series_has_insufficient_data():
"""Test that ValueError is raised for series with < 3 data points."""
# Create dataframe with one series having only 2 points
df = create_df(series_ids=["A", "B"], n_points=[10, 2], target_cols=["target"], freq="h")
# Verify error is raised with series ID in message
with pytest.raises(ValueError, match=r"Every time series must have at least 3 data points.*series B"):
validate_df_inputs(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
)
def test_validate_df_inputs_raises_error_when_future_df_has_mismatched_series_ids():
"""Test that ValueError is raised when future_df has different series IDs than df."""
# Create df with series A and B
df = create_df(series_ids=["A", "B"], n_points=[10, 15], target_cols=["target"], freq="h")
# Create future_df with only series A
forecast_start_times = get_forecast_start_times(df, freq="h")
future_df = create_future_df(
forecast_start_times=[forecast_start_times[0]], series_ids=["A"], n_points=[5], covariates=None, freq="h"
)
# Verify appropriate error is raised
with pytest.raises(ValueError, match=r"future_df must contain the same time series IDs as df"):
validate_df_inputs(
df=df,
future_df=future_df,
target_columns=["target"],
prediction_length=5,
)
def test_validate_df_inputs_raises_error_when_future_df_has_incorrect_lengths():
"""Test that ValueError is raised when future_df lengths don't match prediction_length."""
# Create df with series A and B with a covariate
df = create_df(series_ids=["A", "B"], n_points=[10, 13], target_cols=["target"], covariates=["cov1"], freq="h")
# Create future_df with varying lengths per series (3 and 7 instead of 5)
forecast_start_times = get_forecast_start_times(df, freq="h")
future_df = create_future_df(
forecast_start_times=forecast_start_times,
series_ids=["A", "B"],
n_points=[3, 7], # incorrect lengths
covariates=["cov1"],
freq="h",
)
# Verify error message indicates which series have incorrect lengths
with pytest.raises(
ValueError, match=r"future_df must contain prediction_length=5 values for each series.*different lengths"
):
validate_df_inputs(
df=df,
future_df=future_df,
target_columns=["target"],
prediction_length=5,
)
# Tests for convert_df_input_to_list_of_dicts_input function
def test_convert_df_with_single_target_preserves_values():
"""Test conversion with single target column."""
df = create_df(series_ids=["A", "B"], n_points=[10, 12], target_cols=["target"], freq="h")
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
)
# Verify output list has correct length (one per series)
assert len(inputs) == 2
# Verify target arrays have correct shape and values match input
assert inputs[0]["target"].shape == (1, 10) # (n_targets=1, n_timesteps=10)
assert inputs[1]["target"].shape == (1, 12) # (n_targets=1, n_timesteps=12)
# Verify values are preserved
df_sorted = df.sort_values(["item_id", "timestamp"])
np.testing.assert_array_almost_equal(
inputs[0]["target"][0], df_sorted[df_sorted["item_id"] == "A"]["target"].values
)
np.testing.assert_array_almost_equal(
inputs[1]["target"][0], df_sorted[df_sorted["item_id"] == "B"]["target"].values
)
def test_convert_df_with_multiple_targets_preserves_values_and_shape():
"""Test conversion with multiple target columns."""
df = create_df(series_ids=["A", "B"], n_points=[10, 14], target_cols=["target1", "target2"], freq="h")
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target1", "target2"],
prediction_length=5,
)
# Verify target arrays have shape (n_targets, n_timesteps)
assert inputs[0]["target"].shape == (2, 10)
assert inputs[1]["target"].shape == (2, 14)
# Verify all target values are preserved for both series
df_sorted = df.sort_values(["item_id", "timestamp"])
for i, series_id in enumerate(["A", "B"]):
series_data = df_sorted[df_sorted["item_id"] == series_id]
np.testing.assert_array_almost_equal(inputs[i]["target"][0], series_data["target1"].values)
np.testing.assert_array_almost_equal(inputs[i]["target"][1], series_data["target2"].values)
def test_convert_df_with_past_covariates_includes_them_in_output():
"""Test conversion with past covariates only."""
df = create_df(
series_ids=["A", "B"], n_points=[10, 16], target_cols=["target"], covariates=["cov1", "cov2"], freq="h"
)
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
)
# Verify output includes past_covariates dictionary
assert "past_covariates" in inputs[0]
assert "cov1" in inputs[0]["past_covariates"]
assert "cov2" in inputs[0]["past_covariates"]
# Verify covariate values match input for both series
assert inputs[0]["past_covariates"]["cov1"].shape == (10,)
assert inputs[0]["past_covariates"]["cov2"].shape == (10,)
assert inputs[1]["past_covariates"]["cov1"].shape == (16,)
assert inputs[1]["past_covariates"]["cov2"].shape == (16,)
# Verify no future_covariates key in output
assert "future_covariates" not in inputs[0]
def test_convert_df_with_past_and_future_covariates_includes_both():
"""Test conversion with both past and future covariates."""
df = create_df(series_ids=["A", "B"], n_points=[10, 18], target_cols=["target"], covariates=["cov1"], freq="h")
forecast_start_times = get_forecast_start_times(df, freq="h")
future_df = create_future_df(
forecast_start_times=forecast_start_times,
series_ids=["A", "B"],
n_points=[5, 5],
covariates=["cov1"],
freq="h",
)
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=future_df,
target_columns=["target"],
prediction_length=5,
)
# Verify output includes both past_covariates and future_covariates dictionaries for both series
assert "past_covariates" in inputs[0]
assert "future_covariates" in inputs[0]
assert "past_covariates" in inputs[1]
assert "future_covariates" in inputs[1]
# Verify all covariate values are preserved with correct shapes
assert inputs[0]["past_covariates"]["cov1"].shape == (10,)
assert inputs[0]["future_covariates"]["cov1"].shape == (5,)
assert inputs[1]["past_covariates"]["cov1"].shape == (18,)
assert inputs[1]["future_covariates"]["cov1"].shape == (5,)
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
def test_convert_df_generates_prediction_timestamps_with_correct_frequency(freq):
"""Test that prediction timestamps follow the inferred frequency."""
# Use multiple series with irregular lengths
df = create_df(series_ids=["A", "B", "C"], n_points=[10, 15, 12], target_cols=["target"], freq=freq)
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
)
# Verify timestamps for all series
for series_id in ["A", "B", "C"]:
# Verify timestamps start after last context timestamp
last_context_time = df[df["item_id"] == series_id]["timestamp"].max()
first_pred_time = prediction_timestamps[series_id][0]
assert first_pred_time > last_context_time
# Verify timestamps are evenly spaced according to frequency
pred_times = prediction_timestamps[series_id]
assert len(pred_times) == 5
inferred_freq = pd.infer_freq(pred_times)
assert inferred_freq is not None
def test_convert_df_skips_validation_when_disabled():
"""Test that validate_inputs=False skips validation."""
df = create_df(series_ids=["A", "B"], n_points=[10, 12], target_cols=["target"], freq="h")
# Mock validate_df_inputs to verify it's not called when validation is disabled
with patch("chronos.df_utils.validate_df_inputs") as mock_validate:
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
validate_inputs=False,
)
# Verify validate_df_inputs was not called
mock_validate.assert_not_called()
# Verify conversion still works
assert len(inputs) == 2
def test_convert_df_preserves_all_values_with_random_inputs():
"""Generate random dataframe and verify all values are preserved exactly."""
# Generate random parameters
n_series = np.random.randint(2, 5)
n_targets = np.random.randint(1, 4)
n_past_only_covariates = np.random.randint(1, 3)
n_future_covariates = np.random.randint(1, 3)
prediction_length = 5
series_ids = [f"series_{i}" for i in range(n_series)]
n_points = [np.random.randint(10, 20) for _ in range(n_series)]
target_cols = [f"target_{i}" for i in range(n_targets)]
past_only_covariates = [f"past_cov_{i}" for i in range(n_past_only_covariates)]
future_covariates = [f"future_cov_{i}" for i in range(n_future_covariates)]
all_covariates = past_only_covariates + future_covariates
# Create dataframe with all covariates
df = create_df(
series_ids=series_ids, n_points=n_points, target_cols=target_cols, covariates=all_covariates, freq="h"
)
# Create future_df with only future covariates (not past-only ones)
forecast_start_times = get_forecast_start_times(df, freq="h")
future_df = create_future_df(
forecast_start_times=forecast_start_times,
series_ids=series_ids,
n_points=[prediction_length] * n_series,
covariates=future_covariates,
freq="h",
)
# Convert to list-of-dicts format
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=future_df,
target_columns=target_cols,
prediction_length=prediction_length,
)
# Verify all target values are preserved exactly
df_sorted = df.sort_values(["item_id", "timestamp"])
for i, series_id in enumerate(series_ids):
series_data = df_sorted[df_sorted["item_id"] == series_id]
assert inputs[i]["target"].shape == (n_targets, n_points[i])
for j, target_col in enumerate(target_cols):
np.testing.assert_array_almost_equal(inputs[i]["target"][j], series_data[target_col].values)
# Verify all past covariate values are preserved (both past-only and future covariates)
for i, series_id in enumerate(series_ids):
series_data = df_sorted[df_sorted["item_id"] == series_id]
assert "past_covariates" in inputs[i]
for cov in all_covariates:
np.testing.assert_array_almost_equal(inputs[i]["past_covariates"][cov], series_data[cov].values)
# Verify only future covariates are in future_covariates (not past-only ones)
future_df_sorted = future_df.sort_values(["item_id", "timestamp"])
for i, series_id in enumerate(series_ids):
series_future_data = future_df_sorted[future_df_sorted["item_id"] == series_id]
assert "future_covariates" in inputs[i]
# Only future covariates should be present
assert set(inputs[i]["future_covariates"].keys()) == set(future_covariates)
for cov in future_covariates:
np.testing.assert_array_almost_equal(inputs[i]["future_covariates"][cov], series_future_data[cov].values)
# Verify output structure is correct
assert len(inputs) == n_series
assert list(original_order) == series_ids
assert len(prediction_timestamps) == n_series
def test_convert_df_with_freq_and_validate_inputs_raises_error():
"""Test that providing freq with validate_inputs=True raises ValueError."""
df = create_df(series_ids=["A", "B"], n_points=[10, 12], target_cols=["target"], freq="h")
with pytest.raises(ValueError, match="freq can only be provided when validate_inputs=False"):
convert_df_input_to_list_of_dicts_input(
df=df,
future_df=None,
target_columns=["target"],
prediction_length=5,
freq="h",
validate_inputs=True,
)
@pytest.mark.parametrize("use_future_df", [True, False])
def test_convert_df_with_freq_and_validate_inputs_false(use_future_df):
"""Test that freq works with validate_inputs=False."""
df = create_df(series_ids=["A", "B"], n_points=[10, 12], target_cols=["target"], covariates=["cov1"], freq="h")
prediction_length = 5
future_df = None
if use_future_df:
forecast_start_times = get_forecast_start_times(df, freq="h")
future_df = create_future_df(
forecast_start_times=forecast_start_times,
series_ids=["A", "B"],
n_points=[prediction_length, prediction_length],
covariates=["cov1"],
freq="h",
)
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=future_df,
target_columns=["target"],
prediction_length=prediction_length,
freq="h",
validate_inputs=False,
)
assert len(inputs) == 2
assert len(prediction_timestamps) == 2
for series_id in ["A", "B"]:
assert len(prediction_timestamps[series_id]) == prediction_length
@pytest.mark.parametrize("use_future_df", [True, False])
def test_convert_df_with_mismatched_freq_uses_user_provided_freq(use_future_df):
"""Test that user-provided freq overrides data frequency when validate_inputs=False."""
# Create data with hourly frequency
data_freq = "h"
df = create_df(
series_ids=["A", "B"], n_points=[10, 12], target_cols=["target"], covariates=["cov1"], freq=data_freq
)
prediction_length = 5
# User provides daily frequency (different from data)
user_freq = "D"
future_df = None
if use_future_df:
# Create future_df with hourly frequency (matching data, not user freq)
forecast_start_times = get_forecast_start_times(df, freq=data_freq)
future_df = create_future_df(
forecast_start_times=forecast_start_times,
series_ids=["A", "B"],
n_points=[prediction_length, prediction_length],
covariates=["cov1"],
freq=data_freq,
)
inputs, original_order, prediction_timestamps = convert_df_input_to_list_of_dicts_input(
df=df,
future_df=future_df,
target_columns=["target"],
prediction_length=prediction_length,
freq=user_freq,
validate_inputs=False,
)
# Prediction should work
assert len(inputs) == 2
assert len(prediction_timestamps) == 2
# Forecast timestamps should use user-provided freq (daily), not data freq (hourly)
for series_id in ["A", "B"]:
pred_ts = prediction_timestamps[series_id]
assert len(pred_ts) == prediction_length
# Verify the frequency matches user-provided freq
inferred_freq = pd.infer_freq(pred_ts)
assert inferred_freq == user_freq

View file

@ -1,5 +1,8 @@
from typing import Optional, Tuple
import time
from typing import Callable, Optional, Tuple
import numpy as np
import pandas as pd
import torch
@ -9,3 +12,47 @@ def validate_tensor(a: torch.Tensor, shape: Tuple[int, ...], dtype: Optional[tor
if dtype is not None:
assert a.dtype == dtype
def create_df(series_ids=["A", "B"], n_points=[10, 10], target_cols=["target"], covariates=None, freq="h"):
"""Helper to create test context DataFrames."""
series_dfs = []
for series_id, length in zip(series_ids, n_points):
series_data = {"item_id": series_id, "timestamp": pd.date_range(end="2001-10-01", periods=length, freq=freq)}
for target_col in target_cols:
series_data[target_col] = np.random.randn(length)
if covariates:
for cov in covariates:
series_data[cov] = np.random.randn(length)
series_dfs.append(pd.DataFrame(series_data))
return pd.concat(series_dfs, ignore_index=True)
def create_future_df(forecast_start_times: list, series_ids=["A", "B"], n_points=[5, 5], covariates=None, freq="h"):
"""Helper to create test future DataFrames."""
series_dfs = []
for series_id, length, start in zip(series_ids, n_points, forecast_start_times):
series_data = {"item_id": series_id, "timestamp": pd.date_range(start=start, periods=length, freq=freq)}
if covariates:
for cov in covariates:
series_data[cov] = np.random.randn(length)
series_dfs.append(pd.DataFrame(series_data))
return pd.concat(series_dfs, ignore_index=True)
def get_forecast_start_times(df, freq="h"):
context_end_times = df.groupby("item_id")["timestamp"].max()
forecast_start_times = [pd.date_range(end_time, periods=2, freq=freq)[-1] for end_time in context_end_times]
return forecast_start_times
def timeout_callback(seconds: float | None) -> Callable:
"""Return a callback object that raises an exception if time limit is exceeded."""
start_time = time.monotonic()
def callback() -> None:
if seconds is not None and time.monotonic() - start_time > seconds:
raise TimeoutError("time limit exceeded")
return callback