chronos-forecasting/test/test_chronos2.py
Oleksandr Shchur f951d9aefa
Rename task -> input in dataset.py (#467)
*Issue #, if available:*

*Description of changes:*

Rename all variables and methods that refer to "task" in `dataset.py` to
use `input` instead:
- `PreparedTask` → `PreparedInput`
- `self.tasks` → `self.inputs`
- `prepare_tasks` → `prepare_inputs`
- `validate_and_prepare_single_dict_task` →
`validate_and_prepare_single_dict_input`
- All `task_` prefixed local variables renamed (e.g., `task_target` →
`target`, `task_context` → `context`, `task_past_tensor` →
`past_tensor`, etc.)


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
2026-02-19 13:30:08 +01:00

1169 lines
48 KiB
Python

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import json
from copy import deepcopy
from pathlib import Path
import datasets
import fev
import numpy as np
import pandas as pd
import pytest
import torch
from chronos import BaseChronosPipeline, Chronos2Pipeline
from chronos.chronos2.config import Chronos2CoreConfig
from chronos.chronos2.layers import MHA
from chronos.df_utils import convert_df_input_to_list_of_dicts_input
from test.util import create_df, create_future_df, get_forecast_start_times, validate_tensor, timeout_callback
DUMMY_MODEL_PATH = Path(__file__).parent / "dummy-chronos2-model"
with open(DUMMY_MODEL_PATH / "config.json") as fp:
config = json.load(fp)
DEFAULT_MODEL_NUM_QUANTILES = len(config["chronos_config"]["quantiles"])
@pytest.fixture
def pipeline() -> Chronos2Pipeline:
return BaseChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu")
def test_base_chronos2_pipeline_loads_from_s3():
BaseChronosPipeline.from_pretrained("s3://autogluon/chronos-2", device_map="cpu")
def test_base_chronos2_pipeline_loads_from_hf():
BaseChronosPipeline.from_pretrained("amazon/chronos-2", device_map="cpu")
def test_chronos2_lora_pipeline_loads_from_disk():
Chronos2Pipeline.from_pretrained(Path(__file__).parent / "dummy-chronos2-lora", device_map="cpu")
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
# Homogenous univariate task
(torch.rand(4, 1, 16), 7, [(1, DEFAULT_MODEL_NUM_QUANTILES, 7)] * 4),
# Homogenous multivariate task
(torch.rand(4, 3, 37), 27, [(3, DEFAULT_MODEL_NUM_QUANTILES, 27)] * 4),
# Heterogenous tasks with different history lengths
(
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
68,
[
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
(2, DEFAULT_MODEL_NUM_QUANTILES, 68),
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
],
),
# Homogenous univariate list of dicts with target only
(
[{"target": torch.rand(10)}, {"target": torch.rand(110)}, {"target": torch.rand(17)}],
5,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 5)] * 3,
),
# Homogenous multivariate list of dicts with target only
(
[{"target": torch.rand(2, 10)}, {"target": torch.rand(2, 110)}, {"target": torch.rand(2, 17)}],
16,
[(2, DEFAULT_MODEL_NUM_QUANTILES, 16)] * 3,
),
# Homogenous list of dicts with target and past-only covariates
(
[
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
{"target": torch.rand(110), "past_covariates": {"feat_1": torch.rand(110)}},
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(17)}},
],
10,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 10)] * 3,
),
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(99),
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
),
# Heterogenous list of dicts with different mix of tasks
(
[
{
"target": torch.rand(100),
"past_covariates": {"temperature": torch.rand(100), "precipitation": torch.rand(100)},
"future_covariates": {"temperature": torch.rand(200)},
},
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
{
"target": np.random.rand(150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{
"target": np.random.rand(3, 150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{"target": torch.rand(1, 150)},
],
200,
[
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
(2, DEFAULT_MODEL_NUM_QUANTILES, 200),
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
(3, DEFAULT_MODEL_NUM_QUANTILES, 200),
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
],
),
],
)
def test_when_input_is_valid_then_pipeline_can_predict(pipeline, inputs, prediction_length, expected_output_shapes):
outputs = pipeline.predict(inputs, prediction_length=prediction_length)
assert isinstance(outputs, list) and len(outputs) == len(expected_output_shapes)
for out, expected_shape in zip(outputs, expected_output_shapes):
validate_tensor(out, expected_shape, dtype=torch.float32)
@pytest.mark.parametrize(
"inputs, prediction_length, quantile_levels, expected_output_shapes",
[
# Homogenous univariate task
(torch.rand(4, 1, 16), 7, [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], [(1, 7, 9)] * 4),
# Homogenous multivariate task
(torch.rand(4, 3, 37), 27, [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], [(3, 27, 8)] * 4),
# Heterogenous tasks with different history lengths
(
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
68,
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
[(1, 68, 10), (2, 68, 10), (1, 68, 10)],
),
# Homogenous univariate list of dicts with target only
(
[{"target": torch.rand(10)}, {"target": torch.rand(110)}, {"target": torch.rand(17)}],
5,
[0.1, 0.5, 0.9],
[(1, 5, 3)] * 3,
),
# Homogenous multivariate list of dicts with target only
(
[{"target": torch.rand(2, 10)}, {"target": torch.rand(2, 110)}, {"target": torch.rand(2, 17)}],
16,
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99],
[(2, 16, 11)] * 3,
),
# Homogenous list of dicts with target and past-only covariates
(
[
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
{"target": torch.rand(110), "past_covariates": {"feat_1": torch.rand(110)}},
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(17)}},
],
10,
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
[(1, 10, 9)] * 3,
),
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(99),
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
[0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99],
[(1, 15, 13)] * 3,
),
# Heterogenous list of dicts with different mix of tasks
(
[
{
"target": torch.rand(100),
"past_covariates": {"temperature": torch.rand(100), "precipitation": torch.rand(100)},
"future_covariates": {"temperature": torch.rand(200)},
},
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
{
"target": np.random.rand(150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{
"target": np.random.rand(3, 150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{"target": torch.rand(1, 150)},
],
200,
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99],
[(1, 200, 11), (2, 200, 11), (1, 200, 11), (3, 200, 11), (1, 200, 11)],
),
],
)
def test_when_input_is_valid_then_pipeline_can_predict_quantiles(
pipeline, inputs, prediction_length, quantile_levels, expected_output_shapes
):
quantiles, mean = pipeline.predict_quantiles(
inputs, prediction_length=prediction_length, quantile_levels=quantile_levels
)
assert isinstance(quantiles, list) and len(quantiles) == len(expected_output_shapes)
assert isinstance(mean, list) and len(mean) == len(expected_output_shapes)
for out_q, out_m, expected_shape in zip(quantiles, mean, expected_output_shapes):
validate_tensor(out_q, expected_shape, dtype=torch.float32)
validate_tensor(out_m, expected_shape[:-1], dtype=torch.float32)
@pytest.mark.parametrize(
"inputs, error_match_string",
[
(torch.rand(16), "should be 3-d with shape"),
(torch.rand(4, 3), "should be 3-d with shape"),
([torch.rand(1, 2, 100), torch.rand(120)], "the elements should either be 1-d"),
([{"target": torch.rand(10)}, {"target": torch.rand(1, 2, 17), "extra_key": []}], "Found invalid keys"),
([{"target": torch.rand(10)}, {"target": torch.rand(1, 2, 17)}], "`target` should either be 1-d with shape"),
([{"target": torch.rand(10), "past_covariates": torch.rand(10)}], "Found invalid type for `past_covariates`"),
(
[
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(10)}},
],
"`past_covariates` must be 1-d with length",
),
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat": torch.rand(10)},
"future_covariates": torch.rand(10),
}
],
"Found invalid type for `future_covariates`",
),
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat": torch.rand(10)},
"future_covariates": {"feat": torch.rand(10), "extra": torch.rand(10)},
}
],
"Expected keys in `future_covariates`",
),
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat": torch.rand(10)},
"future_covariates": {"feat": torch.rand(17)},
}
],
"`future_covariates` must be 1-d with length",
),
],
)
def test_when_input_is_invalid_then_predict_raises_value_error(pipeline, inputs, error_match_string):
with pytest.raises(ValueError, match=error_match_string):
_ = pipeline.predict(inputs, prediction_length=10)
@pytest.mark.parametrize("dtype", [torch.float32, torch.bfloat16])
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
def test_pipeline_predict_can_handle_different_model_and_input_dtypes(dtype: torch.dtype, input_dtype: torch.dtype):
pipeline = BaseChronosPipeline.from_pretrained(
Path(__file__).parent / "dummy-chronos2-model", device_map="cpu", torch_dtype=dtype
)
context = 10 * torch.rand(size=(4, 3, 16)) + 10
context = context.to(dtype=input_dtype)
expected_num_quantiles = len(pipeline.quantiles)
# input: tensor of shape (batch_size, n_variates, context_length)
quantiles = pipeline.predict(context, prediction_length=7)
for quantiles_item in quantiles:
validate_tensor(quantiles_item, (3, expected_num_quantiles, 7), dtype=torch.float32)
@pytest.mark.parametrize(
"inputs, expected_output_shapes",
[
# NOTE: d_model for the dummy model is 6
# Homogenous univariate task
(torch.rand(4, 1, 16), [(1, 3, 6)] * 4),
# Homogenous multivariate task
(torch.rand(4, 3, 37), [(3, 5, 6)] * 4),
# Heterogenous tasks with different history lengths
(
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
[(1, 12, 6), (2, 12, 6), (1, 12, 6)],
),
],
)
def test_when_input_is_valid_then_pipeline_can_embed(pipeline, inputs, expected_output_shapes):
embeds, loc_scales = pipeline.embed(inputs)
assert (
isinstance(embeds, list)
and len(embeds) == len(expected_output_shapes)
and len(loc_scales) == len(expected_output_shapes)
)
for embed, loc_scale, expected_shape in zip(embeds, loc_scales, expected_output_shapes):
validate_tensor(embed, expected_shape, dtype=torch.float32)
validate_tensor(loc_scale[0], (expected_shape[0], 1), dtype=torch.float32)
validate_tensor(loc_scale[1], (expected_shape[0], 1), dtype=torch.float32)
@pytest.mark.parametrize(
"task_kwargs",
[
{"dataset_path": "autogluon/chronos_datasets", "dataset_config": "monash_m1_yearly", "horizon": 8},
{
"dataset_path": "autogluon/chronos_datasets",
"dataset_config": "monash_m1_yearly",
"horizon": 8,
"eval_metric": "WQL",
"quantile_levels": [0.1, 0.2],
},
{
"dataset_path": "autogluon/fev_datasets",
"dataset_config": "ETT_1H",
"horizon": 27,
"target": ["HULL", "HUFL", "OT"],
},
{
"dataset_path": "autogluon/fev_datasets",
"dataset_config": "ETT_1H",
"horizon": 34,
"target": "OT",
"past_dynamic_columns": ["HULL", "HUFL"],
},
{
"dataset_path": "autogluon/fev_datasets",
"dataset_config": "ETT_1H",
"horizon": 34,
"target": "OT",
"past_dynamic_columns": ["HULL", "HUFL"],
"known_dynamic_columns": ["LULL"],
},
],
)
def test_pipeline_can_evaluate_on_dummy_fev_task(pipeline, task_kwargs):
task = fev.Task(**task_kwargs)
predictions_per_window, inference_time_s = pipeline.predict_fev(task)
assert isinstance(inference_time_s, float)
assert isinstance(predictions_per_window, list) and all(
isinstance(pred, datasets.DatasetDict) for pred in predictions_per_window
)
eval_summary = task.evaluation_summary(predictions_per_window, model_name="chronos-2")
assert isinstance(eval_summary["test_error"], float)
@pytest.mark.parametrize(
"context_setup, future_setup",
[
# Targets only
({}, None),
# Multiple targets with different context lengths
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
@pytest.mark.parametrize("prediction_length", [1, 4])
@pytest.mark.parametrize("validate_inputs", [True, False])
def test_predict_df_works_for_valid_inputs(
pipeline, context_setup, future_setup, freq, validate_inputs, prediction_length
):
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
n_series = len(series_ids)
n_targets = len(target_columns)
result = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
validate_inputs=validate_inputs,
)
expected_rows = n_series * n_targets * prediction_length
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
)
assert "target_name" in result.columns and np.all(
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
)
assert "timestamp" in result.columns and np.all(
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
)
assert "predictions" in result.columns
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
@pytest.mark.parametrize(
"context_data, error_match",
[
# Missing timestamp column
({"item_id": ["A"], "target": [1.0]}, "df does not contain all"),
# Insufficient data points
({"item_id": ["A"], "timestamp": ["2023-01-01"], "target": [1.0]}, "must have at least 3 data"),
],
)
def test_predict_df_df_validation_errors(pipeline, context_data, error_match):
df = pd.DataFrame(context_data)
with pytest.raises(ValueError, match=error_match):
pipeline.predict_df(df)
@pytest.mark.parametrize(
"future_data, error_match",
[
# Missing timestamp column
({"item_id": ["A"], "cov1": [1.0]}, "future_df does not contain all"),
# target in future_df
(
{"item_id": ["A"], "timestamp": ["2023-01-01"], "cov1": [1.0], "target": [1.0]},
"future_df cannot contain target",
),
# Extra columns in future_df
(
{"item_id": ["A"], "timestamp": ["2023-01-01"], "cov1": [1.0], "cov2": [1.0]},
"future_df cannot contain columns not present",
),
],
)
def test_predict_df_future_df_validation_errors(pipeline, future_data, error_match):
df = create_df(series_ids=["A", "B"], covariates=["cov1"], freq="h")
future_df = pd.DataFrame(future_data)
with pytest.raises(ValueError, match=error_match):
pipeline.predict_df(df, future_df=future_df)
@pytest.mark.parametrize("validate_inputs", [True, False])
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline, validate_inputs):
df = create_df()
# Make timestamps non-uniform for series A (first series)
df.loc[df["item_id"] == "A", "timestamp"] = [
"2023-01-01",
"2023-01-02",
"2023-01-04",
"2023-01-05",
"2023-01-06",
"2023-01-07",
"2023-01-08",
"2023-01-09",
"2023-01-10",
"2023-01-11",
]
with pytest.raises((ValueError, AssertionError), match="not infer frequency"):
pipeline.predict_df(df, validate_inputs=validate_inputs)
def test_predict_df_with_inconsistent_frequencies_raises_error(pipeline):
df = pd.DataFrame(
{
"item_id": ["A", "A", "A", "A", "A", "B", "B", "B", "B", "B"],
"timestamp": [
"2023-01-01",
"2023-01-02",
"2023-01-03",
"2023-01-04",
"2023-01-05",
"2023-01-01",
"2023-02-01",
"2023-03-01",
"2023-04-01",
"2023-05-01",
],
"target": [1.0] * 10,
}
)
with pytest.raises(ValueError, match="same frequency"):
pipeline.predict_df(df)
def test_predict_df_with_future_df_missing_series_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A"], covariates=["cov1"]
) # Missing Bs=["cov1"]) # Missing B
with pytest.raises(ValueError, match="same time series IDs"):
pipeline.predict_df(df, future_df=future_df)
def test_predict_df_with_future_df_with_different_freq_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"], freq="h")
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 3], covariates=["cov1"], freq="D"
)
with pytest.raises(ValueError, match="future_df timestamps do not match"):
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
def test_predict_df_with_future_df_with_different_lengths_raises_error(pipeline):
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
future_df = create_future_df(
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 7], covariates=["cov1"]
)
with pytest.raises(ValueError, match="future_df must contain prediction"):
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
@pytest.mark.parametrize(
"context_setup, future_setup",
[
# Targets only
({}, None),
# Multiple targets with different context lengths
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("prediction_length", [1, 4])
def test_predict_df_outputs_different_results_with_cross_learning_enabled(
pipeline, context_setup, future_setup, prediction_length
):
freq = "h"
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
result_with_cross_learning = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
cross_learning=True,
)
result_without_cross_learning = pipeline.predict_df(
df,
future_df=future_df,
target=target_columns,
prediction_length=prediction_length,
cross_learning=False,
)
assert not np.array_equal(result_with_cross_learning["predictions"], result_without_cross_learning["predictions"])
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
# Homogenous univariate task
(torch.rand(4, 1, 16), 7, [(1, DEFAULT_MODEL_NUM_QUANTILES, 7)] * 4),
# Homogenous multivariate task
(torch.rand(4, 3, 37), 27, [(3, DEFAULT_MODEL_NUM_QUANTILES, 27)] * 4),
# Heterogenous tasks with different history lengths
(
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
68,
[
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
(2, DEFAULT_MODEL_NUM_QUANTILES, 68),
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
],
),
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(99),
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
),
# Heterogenous list of dicts with different mix of tasks
(
[
{
"target": torch.rand(1000),
"past_covariates": {"temperature": torch.rand(1000), "precipitation": torch.rand(1000)},
"future_covariates": {"temperature": torch.rand(200)},
},
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
{
"target": np.random.rand(150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{
"target": np.random.rand(3, 150),
"past_covariates": {
"numeric_covariate_1": np.random.rand(150),
"numeric_covariate_2": np.random.rand(150),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
},
"future_covariates": {
"numeric_covariate_1": np.random.rand(200),
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
},
},
{"target": torch.rand(1, 150)},
],
200,
[
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
(2, DEFAULT_MODEL_NUM_QUANTILES, 200),
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
(3, DEFAULT_MODEL_NUM_QUANTILES, 200),
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
],
),
],
)
@pytest.mark.parametrize("finetune_mode", ["full", "lora"])
def test_when_input_is_valid_then_pipeline_can_be_finetuned(
pipeline, inputs, prediction_length, expected_output_shapes, finetune_mode
):
# Get outputs before fine-tuning
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
ft_pipeline = pipeline.fit(
inputs,
prediction_length=prediction_length,
num_steps=5,
min_past=1,
batch_size=32,
finetune_mode=finetune_mode,
)
# Get outputs from fine-tuned pipeline
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
# Get outputs from original pipeline after fine-tuning
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
# Check output shapes are correct and output is different from the pretrained model outputs
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
):
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
assert torch.allclose(orig_out_before, orig_out_after)
assert not torch.allclose(orig_out_before, finetuned_out)
assert not torch.isnan(finetuned_out).any()
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(99),
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
)
],
)
def test_pipeline_can_be_finetuned_with_validation(pipeline, inputs, prediction_length, expected_output_shapes):
# Get outputs before fine-tuning
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
ft_pipeline = pipeline.fit(
inputs,
prediction_length=prediction_length,
validation_inputs=inputs,
num_steps=20,
min_past=1,
eval_steps=10,
logging_steps=10,
batch_size=32,
)
# Get outputs from fine-tuned pipeline
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
# Get outputs from original pipeline after fine-tuning
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
# Check output shapes are correct and output is different from the pretrained model outputs
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
):
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
assert torch.allclose(orig_out_before, orig_out_after)
assert not torch.allclose(orig_out_before, finetuned_out)
assert not torch.isnan(finetuned_out).any()
@pytest.mark.parametrize(
"inputs, prediction_length, expected_output_shapes",
[
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(99),
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
)
],
)
@pytest.mark.parametrize("ft_future_values", [None, np.array([]), torch.zeros(0)])
def test_pipeline_can_be_finetuned_with_empty_future_covariates(
pipeline, inputs, prediction_length, expected_output_shapes, ft_future_values
):
# Get outputs before fine-tuning
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
# Replace future covariates with ft_future_values
ft_inputs = deepcopy(inputs)
for idx, task in enumerate(inputs):
for key in task["future_covariates"]:
ft_inputs[idx]["future_covariates"][key] = ft_future_values
ft_pipeline = pipeline.fit(
ft_inputs,
prediction_length=prediction_length,
validation_inputs=ft_inputs,
num_steps=20,
min_past=1,
eval_steps=10,
logging_steps=10,
batch_size=32,
)
# Get outputs from fine-tuned pipeline
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
# Get outputs from original pipeline after fine-tuning
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
# Check output shapes are correct and output is different from the pretrained model outputs
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
):
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
assert torch.allclose(orig_out_before, orig_out_after)
assert not torch.allclose(orig_out_before, finetuned_out)
assert not torch.isnan(finetuned_out).any()
@pytest.mark.parametrize(
"inputs, prediction_length",
[
# Homogenous univariate task
(torch.rand(4, 1, 16), 10),
# Homogenous multivariate task
(torch.rand(4, 3, 37), 27),
# Homogenous list of dicts with target, past-only and known future covariates
(
[
{
"target": torch.rand(10),
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(29),
"past_covariates": {"feat_1": torch.rand(29), "feat_2": torch.rand(29)},
"future_covariates": {"feat_1": torch.rand(15)},
},
{
"target": torch.rand(17),
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
"future_covariates": {"feat_1": torch.rand(15)},
},
],
15,
),
],
)
def test_when_input_time_series_are_too_short_then_finetuning_raises_error(pipeline, inputs, prediction_length):
with pytest.raises(ValueError, match="The dataset is empty after filtering"):
pipeline.fit(
inputs, prediction_length=prediction_length, num_steps=5, min_past=prediction_length, batch_size=32
)
@pytest.mark.parametrize(
"context_setup, future_setup",
[
# Targets only
({}, None),
# Multiple targets with different context lengths
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
# With past covariates
({"covariates": ["cov1"]}, None),
# With future covariates
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
# With past-only and future covariates and different series order
(
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
),
],
)
@pytest.mark.parametrize("freq", ["h", "D", "ME"])
def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future_setup, freq):
prediction_length = 3
df = create_df(**context_setup, freq=freq)
forecast_start_times = get_forecast_start_times(df, freq)
if future_setup:
series_ids = future_setup.get("series_ids", ["A", "B"])
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
else:
future_df = None
series_ids = context_setup.get("series_ids", ["A", "B"])
target_columns = context_setup.get("target_cols", ["target"])
n_series = len(series_ids)
n_targets = len(target_columns)
# Get predictions from the pretrained model
orig_result_before = pipeline.predict_df(
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
)
# Convert df inputs to list of dicts inputs expected by finetune
inputs, _, _ = convert_df_input_to_list_of_dicts_input(
df,
future_df=future_df,
id_column="item_id",
timestamp_column="timestamp",
target_columns=target_columns,
prediction_length=prediction_length,
)
# Finetune the model
ft_pipeline = pipeline.fit(inputs, prediction_length=prediction_length, num_steps=5, min_past=1, batch_size=32)
# Predict with fine-tuned model
result = ft_pipeline.predict_df(
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
)
# Get predictions from the original pipeline again
orig_result_after = pipeline.predict_df(
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
)
# Check predictions from the fine-tuned model are valid
expected_rows = n_series * n_targets * prediction_length
assert len(result) == expected_rows
assert "item_id" in result.columns and np.all(
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
)
assert "target_name" in result.columns and np.all(
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
)
assert "timestamp" in result.columns and np.all(
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
)
assert "predictions" in result.columns
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
# Check predictions from the original pipeline are the same before and after fine-tuning
assert np.allclose(orig_result_before["predictions"].to_numpy(), orig_result_after["predictions"].to_numpy())
# Check predictions from the fine-tuned model are different from the original predictions
assert not np.allclose(orig_result_before["predictions"].to_numpy(), result["predictions"].to_numpy())
def test_when_predict_df_called_with_timeout_callback_then_timeout_error_is_raised(pipeline):
num_series = 1000
large_df = create_df(series_ids=[j for j in range(num_series)], n_points=[2048] * num_series)
with pytest.raises(TimeoutError, match="time limit exceeded"):
pipeline.predict_df(
large_df,
prediction_length=48,
after_batch=timeout_callback(0.1),
)
@pytest.mark.parametrize("attn_implementation", ["eager", "sdpa"])
def test_pipeline_works_with_different_attention_implementations(attn_implementation):
"""Test that the pipeline works with different attention implementations."""
# Load the dummy model
model_path = Path(__file__).parent / "dummy-chronos2-model"
# Load with specified attention implementation
pipeline = BaseChronosPipeline.from_pretrained(
model_path, device_map="cpu", attn_implementation=attn_implementation
)
# Verify the config has the correct attention implementation
assert pipeline.model.config._attn_implementation == attn_implementation
# Test prediction with simple input
inputs = torch.rand(2, 1, 16)
prediction_length = 7
outputs = pipeline.predict(inputs, prediction_length=prediction_length)
# Check outputs are valid
assert isinstance(outputs, list) and len(outputs) == 2
for out in outputs:
validate_tensor(out, (1, DEFAULT_MODEL_NUM_QUANTILES, 7), dtype=torch.float32)
@pytest.mark.parametrize("attn_implementation", ["eager", "sdpa"])
@pytest.mark.parametrize("output_attentions", [False, True])
def test_attention_implementations_with_output_attentions(attn_implementation, output_attentions):
"""Test that attention implementations handle output_attentions correctly."""
# Create config with specified attention implementation
config = Chronos2CoreConfig(
d_model=128,
d_kv=32,
num_heads=4,
dropout_rate=0.1,
attn_implementation=attn_implementation,
)
# Create MHA layer
mha = MHA(config, use_rope=True)
mha.eval()
# Create dummy inputs
batch_size = 2
seq_len = 10
hidden_states = torch.randn(batch_size, seq_len, config.d_model)
position_ids = torch.arange(seq_len).unsqueeze(0).expand(batch_size, -1)
mask = torch.zeros(batch_size, config.num_heads, seq_len, seq_len)
# Test forward pass
output = mha(
hidden_states=hidden_states,
mask=mask,
position_ids=position_ids,
output_attentions=output_attentions,
)
# Check output shape
assert output.hidden_states.shape == (batch_size, seq_len, config.d_model)
# Check attention weights - should only be returned when output_attentions=True
if output_attentions:
assert output.attn_weights is not None
assert output.attn_weights.shape == (batch_size, config.num_heads, seq_len, seq_len)
else:
# SDPA doesn't return weights
if attn_implementation == "sdpa":
assert output.attn_weights is None
def test_eager_and_sdpa_produce_identical_outputs(pipeline):
"""Test that eager and SDPA implementations produce identical outputs on full pipeline."""
# Reload pipeline with SDPA
model_path = Path(__file__).parent / "dummy-chronos2-model"
pipeline_sdpa = BaseChronosPipeline.from_pretrained(
model_path, device_map="cpu", attn_implementation="sdpa", torch_dtype=torch.float32
)
# Note: the original pipeline fixture uses default attn_implementation which should be sdpa
# Force eager for comparison
pipeline_eager = BaseChronosPipeline.from_pretrained(
model_path, device_map="cpu", attn_implementation="eager", torch_dtype=torch.float32
)
# Test 1: Simple univariate input
inputs_simple = torch.rand(2, 1, 16)
prediction_length = 7
with torch.no_grad():
outputs_eager = pipeline_eager.predict(inputs_simple, prediction_length=prediction_length)
outputs_sdpa = pipeline_sdpa.predict(inputs_simple, prediction_length=prediction_length)
# Verify outputs match exactly
assert len(outputs_eager) == len(outputs_sdpa)
for out_eager, out_sdpa in zip(outputs_eager, outputs_sdpa):
# Should match exactly or very close (numerical precision)
assert torch.allclose(out_eager, out_sdpa, atol=1e-5, rtol=1e-4)
# Test 2: Multivariate inputs with covariates to test group attention
inputs_grouped = [
{
"target": np.random.randn(2, 36),
"past_covariates": {
"temperature": np.random.randn(36),
"weather_type": np.random.choice(["sunny", "cloudy", "rainy"], size=36),
},
"future_covariates": {
"temperature": np.random.randn(prediction_length),
"weather_type": np.random.choice(["sunny", "cloudy", "rainy"], size=prediction_length),
},
}
for _ in range(5)
]
with torch.no_grad():
outputs_eager_grouped = pipeline_eager.predict(inputs_grouped, prediction_length=prediction_length)
outputs_sdpa_grouped = pipeline_sdpa.predict(inputs_grouped, prediction_length=prediction_length)
# Verify outputs match for grouped inputs
assert len(outputs_eager_grouped) == len(outputs_sdpa_grouped)
for out_eager, out_sdpa in zip(outputs_eager_grouped, outputs_sdpa_grouped):
# Should match exactly or very close (numerical precision)
assert torch.allclose(out_eager, out_sdpa, atol=1e-5, rtol=1e-4)
def test_pipeline_can_be_finetuned_with_preprocessed_hf_dataset(pipeline):
"""Test that fine-tuning works with preprocessed inputs from a HuggingFace Dataset."""
from chronos.chronos2.dataset import prepare_inputs
prediction_length = 8
raw_inputs = [{"target": torch.rand(20)}, {"target": torch.rand(25)}, {"target": torch.rand(30)}]
# Preprocess and convert to HF Dataset (simulating Arrow-based lazy loading)
prepared_tasks = prepare_inputs(raw_inputs, prediction_length=prediction_length, min_past=1, mode="train")
hf_dataset = datasets.Dataset.from_list(prepared_tasks).with_format("torch")
# Fine-tune with preprocessed inputs
ft_pipeline = pipeline.fit(
hf_dataset, prediction_length=prediction_length, num_steps=5, min_past=1, batch_size=32, convert_inputs=False
)
# Verify fine-tuned model can predict
ft_outputs = ft_pipeline.predict(raw_inputs, prediction_length=prediction_length)
assert len(ft_outputs) == len(raw_inputs)
for ft_out in ft_outputs:
assert ft_out.shape == (1, DEFAULT_MODEL_NUM_QUANTILES, prediction_length)
assert not torch.isnan(ft_out).any()