mirror of
https://github.com/amazon-science/chronos-forecasting
synced 2026-05-23 09:39:35 +00:00
*Issue #, if available:* *Description of changes:* Rename all variables and methods that refer to "task" in `dataset.py` to use `input` instead: - `PreparedTask` → `PreparedInput` - `self.tasks` → `self.inputs` - `prepare_tasks` → `prepare_inputs` - `validate_and_prepare_single_dict_task` → `validate_and_prepare_single_dict_input` - All `task_` prefixed local variables renamed (e.g., `task_target` → `target`, `task_context` → `context`, `task_past_tensor` → `past_tensor`, etc.) By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1169 lines
48 KiB
Python
1169 lines
48 KiB
Python
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
import json
|
|
from copy import deepcopy
|
|
from pathlib import Path
|
|
|
|
import datasets
|
|
import fev
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pytest
|
|
import torch
|
|
|
|
from chronos import BaseChronosPipeline, Chronos2Pipeline
|
|
from chronos.chronos2.config import Chronos2CoreConfig
|
|
from chronos.chronos2.layers import MHA
|
|
from chronos.df_utils import convert_df_input_to_list_of_dicts_input
|
|
from test.util import create_df, create_future_df, get_forecast_start_times, validate_tensor, timeout_callback
|
|
|
|
DUMMY_MODEL_PATH = Path(__file__).parent / "dummy-chronos2-model"
|
|
|
|
with open(DUMMY_MODEL_PATH / "config.json") as fp:
|
|
config = json.load(fp)
|
|
DEFAULT_MODEL_NUM_QUANTILES = len(config["chronos_config"]["quantiles"])
|
|
|
|
|
|
@pytest.fixture
|
|
def pipeline() -> Chronos2Pipeline:
|
|
return BaseChronosPipeline.from_pretrained(DUMMY_MODEL_PATH, device_map="cpu")
|
|
|
|
|
|
def test_base_chronos2_pipeline_loads_from_s3():
|
|
BaseChronosPipeline.from_pretrained("s3://autogluon/chronos-2", device_map="cpu")
|
|
|
|
|
|
def test_base_chronos2_pipeline_loads_from_hf():
|
|
BaseChronosPipeline.from_pretrained("amazon/chronos-2", device_map="cpu")
|
|
|
|
|
|
def test_chronos2_lora_pipeline_loads_from_disk():
|
|
Chronos2Pipeline.from_pretrained(Path(__file__).parent / "dummy-chronos2-lora", device_map="cpu")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length, expected_output_shapes",
|
|
[
|
|
# Homogenous univariate task
|
|
(torch.rand(4, 1, 16), 7, [(1, DEFAULT_MODEL_NUM_QUANTILES, 7)] * 4),
|
|
# Homogenous multivariate task
|
|
(torch.rand(4, 3, 37), 27, [(3, DEFAULT_MODEL_NUM_QUANTILES, 27)] * 4),
|
|
# Heterogenous tasks with different history lengths
|
|
(
|
|
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
|
|
68,
|
|
[
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
(2, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
],
|
|
),
|
|
# Homogenous univariate list of dicts with target only
|
|
(
|
|
[{"target": torch.rand(10)}, {"target": torch.rand(110)}, {"target": torch.rand(17)}],
|
|
5,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 5)] * 3,
|
|
),
|
|
# Homogenous multivariate list of dicts with target only
|
|
(
|
|
[{"target": torch.rand(2, 10)}, {"target": torch.rand(2, 110)}, {"target": torch.rand(2, 17)}],
|
|
16,
|
|
[(2, DEFAULT_MODEL_NUM_QUANTILES, 16)] * 3,
|
|
),
|
|
# Homogenous list of dicts with target and past-only covariates
|
|
(
|
|
[
|
|
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
|
|
{"target": torch.rand(110), "past_covariates": {"feat_1": torch.rand(110)}},
|
|
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(17)}},
|
|
],
|
|
10,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 10)] * 3,
|
|
),
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(99),
|
|
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
|
|
),
|
|
# Heterogenous list of dicts with different mix of tasks
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(100),
|
|
"past_covariates": {"temperature": torch.rand(100), "precipitation": torch.rand(100)},
|
|
"future_covariates": {"temperature": torch.rand(200)},
|
|
},
|
|
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
|
|
{
|
|
"target": np.random.rand(150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{
|
|
"target": np.random.rand(3, 150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{"target": torch.rand(1, 150)},
|
|
],
|
|
200,
|
|
[
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(2, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(3, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
def test_when_input_is_valid_then_pipeline_can_predict(pipeline, inputs, prediction_length, expected_output_shapes):
|
|
outputs = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
|
|
assert isinstance(outputs, list) and len(outputs) == len(expected_output_shapes)
|
|
for out, expected_shape in zip(outputs, expected_output_shapes):
|
|
validate_tensor(out, expected_shape, dtype=torch.float32)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length, quantile_levels, expected_output_shapes",
|
|
[
|
|
# Homogenous univariate task
|
|
(torch.rand(4, 1, 16), 7, [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], [(1, 7, 9)] * 4),
|
|
# Homogenous multivariate task
|
|
(torch.rand(4, 3, 37), 27, [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], [(3, 27, 8)] * 4),
|
|
# Heterogenous tasks with different history lengths
|
|
(
|
|
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
|
|
68,
|
|
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
|
|
[(1, 68, 10), (2, 68, 10), (1, 68, 10)],
|
|
),
|
|
# Homogenous univariate list of dicts with target only
|
|
(
|
|
[{"target": torch.rand(10)}, {"target": torch.rand(110)}, {"target": torch.rand(17)}],
|
|
5,
|
|
[0.1, 0.5, 0.9],
|
|
[(1, 5, 3)] * 3,
|
|
),
|
|
# Homogenous multivariate list of dicts with target only
|
|
(
|
|
[{"target": torch.rand(2, 10)}, {"target": torch.rand(2, 110)}, {"target": torch.rand(2, 17)}],
|
|
16,
|
|
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99],
|
|
[(2, 16, 11)] * 3,
|
|
),
|
|
# Homogenous list of dicts with target and past-only covariates
|
|
(
|
|
[
|
|
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
|
|
{"target": torch.rand(110), "past_covariates": {"feat_1": torch.rand(110)}},
|
|
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(17)}},
|
|
],
|
|
10,
|
|
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
|
|
[(1, 10, 9)] * 3,
|
|
),
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(99),
|
|
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
[0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99],
|
|
[(1, 15, 13)] * 3,
|
|
),
|
|
# Heterogenous list of dicts with different mix of tasks
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(100),
|
|
"past_covariates": {"temperature": torch.rand(100), "precipitation": torch.rand(100)},
|
|
"future_covariates": {"temperature": torch.rand(200)},
|
|
},
|
|
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
|
|
{
|
|
"target": np.random.rand(150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{
|
|
"target": np.random.rand(3, 150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{"target": torch.rand(1, 150)},
|
|
],
|
|
200,
|
|
[0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99],
|
|
[(1, 200, 11), (2, 200, 11), (1, 200, 11), (3, 200, 11), (1, 200, 11)],
|
|
),
|
|
],
|
|
)
|
|
def test_when_input_is_valid_then_pipeline_can_predict_quantiles(
|
|
pipeline, inputs, prediction_length, quantile_levels, expected_output_shapes
|
|
):
|
|
quantiles, mean = pipeline.predict_quantiles(
|
|
inputs, prediction_length=prediction_length, quantile_levels=quantile_levels
|
|
)
|
|
|
|
assert isinstance(quantiles, list) and len(quantiles) == len(expected_output_shapes)
|
|
assert isinstance(mean, list) and len(mean) == len(expected_output_shapes)
|
|
for out_q, out_m, expected_shape in zip(quantiles, mean, expected_output_shapes):
|
|
validate_tensor(out_q, expected_shape, dtype=torch.float32)
|
|
validate_tensor(out_m, expected_shape[:-1], dtype=torch.float32)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, error_match_string",
|
|
[
|
|
(torch.rand(16), "should be 3-d with shape"),
|
|
(torch.rand(4, 3), "should be 3-d with shape"),
|
|
([torch.rand(1, 2, 100), torch.rand(120)], "the elements should either be 1-d"),
|
|
([{"target": torch.rand(10)}, {"target": torch.rand(1, 2, 17), "extra_key": []}], "Found invalid keys"),
|
|
([{"target": torch.rand(10)}, {"target": torch.rand(1, 2, 17)}], "`target` should either be 1-d with shape"),
|
|
([{"target": torch.rand(10), "past_covariates": torch.rand(10)}], "Found invalid type for `past_covariates`"),
|
|
(
|
|
[
|
|
{"target": torch.rand(10), "past_covariates": {"feat_1": torch.rand(10)}},
|
|
{"target": torch.rand(17), "past_covariates": {"feat_1": torch.rand(10)}},
|
|
],
|
|
"`past_covariates` must be 1-d with length",
|
|
),
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat": torch.rand(10)},
|
|
"future_covariates": torch.rand(10),
|
|
}
|
|
],
|
|
"Found invalid type for `future_covariates`",
|
|
),
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat": torch.rand(10)},
|
|
"future_covariates": {"feat": torch.rand(10), "extra": torch.rand(10)},
|
|
}
|
|
],
|
|
"Expected keys in `future_covariates`",
|
|
),
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat": torch.rand(10)},
|
|
"future_covariates": {"feat": torch.rand(17)},
|
|
}
|
|
],
|
|
"`future_covariates` must be 1-d with length",
|
|
),
|
|
],
|
|
)
|
|
def test_when_input_is_invalid_then_predict_raises_value_error(pipeline, inputs, error_match_string):
|
|
with pytest.raises(ValueError, match=error_match_string):
|
|
_ = pipeline.predict(inputs, prediction_length=10)
|
|
|
|
|
|
@pytest.mark.parametrize("dtype", [torch.float32, torch.bfloat16])
|
|
@pytest.mark.parametrize("input_dtype", [torch.float32, torch.bfloat16, torch.int64])
|
|
def test_pipeline_predict_can_handle_different_model_and_input_dtypes(dtype: torch.dtype, input_dtype: torch.dtype):
|
|
pipeline = BaseChronosPipeline.from_pretrained(
|
|
Path(__file__).parent / "dummy-chronos2-model", device_map="cpu", torch_dtype=dtype
|
|
)
|
|
context = 10 * torch.rand(size=(4, 3, 16)) + 10
|
|
context = context.to(dtype=input_dtype)
|
|
expected_num_quantiles = len(pipeline.quantiles)
|
|
|
|
# input: tensor of shape (batch_size, n_variates, context_length)
|
|
|
|
quantiles = pipeline.predict(context, prediction_length=7)
|
|
for quantiles_item in quantiles:
|
|
validate_tensor(quantiles_item, (3, expected_num_quantiles, 7), dtype=torch.float32)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, expected_output_shapes",
|
|
[
|
|
# NOTE: d_model for the dummy model is 6
|
|
# Homogenous univariate task
|
|
(torch.rand(4, 1, 16), [(1, 3, 6)] * 4),
|
|
# Homogenous multivariate task
|
|
(torch.rand(4, 3, 37), [(3, 5, 6)] * 4),
|
|
# Heterogenous tasks with different history lengths
|
|
(
|
|
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
|
|
[(1, 12, 6), (2, 12, 6), (1, 12, 6)],
|
|
),
|
|
],
|
|
)
|
|
def test_when_input_is_valid_then_pipeline_can_embed(pipeline, inputs, expected_output_shapes):
|
|
embeds, loc_scales = pipeline.embed(inputs)
|
|
|
|
assert (
|
|
isinstance(embeds, list)
|
|
and len(embeds) == len(expected_output_shapes)
|
|
and len(loc_scales) == len(expected_output_shapes)
|
|
)
|
|
for embed, loc_scale, expected_shape in zip(embeds, loc_scales, expected_output_shapes):
|
|
validate_tensor(embed, expected_shape, dtype=torch.float32)
|
|
validate_tensor(loc_scale[0], (expected_shape[0], 1), dtype=torch.float32)
|
|
validate_tensor(loc_scale[1], (expected_shape[0], 1), dtype=torch.float32)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"task_kwargs",
|
|
[
|
|
{"dataset_path": "autogluon/chronos_datasets", "dataset_config": "monash_m1_yearly", "horizon": 8},
|
|
{
|
|
"dataset_path": "autogluon/chronos_datasets",
|
|
"dataset_config": "monash_m1_yearly",
|
|
"horizon": 8,
|
|
"eval_metric": "WQL",
|
|
"quantile_levels": [0.1, 0.2],
|
|
},
|
|
{
|
|
"dataset_path": "autogluon/fev_datasets",
|
|
"dataset_config": "ETT_1H",
|
|
"horizon": 27,
|
|
"target": ["HULL", "HUFL", "OT"],
|
|
},
|
|
{
|
|
"dataset_path": "autogluon/fev_datasets",
|
|
"dataset_config": "ETT_1H",
|
|
"horizon": 34,
|
|
"target": "OT",
|
|
"past_dynamic_columns": ["HULL", "HUFL"],
|
|
},
|
|
{
|
|
"dataset_path": "autogluon/fev_datasets",
|
|
"dataset_config": "ETT_1H",
|
|
"horizon": 34,
|
|
"target": "OT",
|
|
"past_dynamic_columns": ["HULL", "HUFL"],
|
|
"known_dynamic_columns": ["LULL"],
|
|
},
|
|
],
|
|
)
|
|
def test_pipeline_can_evaluate_on_dummy_fev_task(pipeline, task_kwargs):
|
|
task = fev.Task(**task_kwargs)
|
|
predictions_per_window, inference_time_s = pipeline.predict_fev(task)
|
|
|
|
assert isinstance(inference_time_s, float)
|
|
assert isinstance(predictions_per_window, list) and all(
|
|
isinstance(pred, datasets.DatasetDict) for pred in predictions_per_window
|
|
)
|
|
|
|
eval_summary = task.evaluation_summary(predictions_per_window, model_name="chronos-2")
|
|
assert isinstance(eval_summary["test_error"], float)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"context_setup, future_setup",
|
|
[
|
|
# Targets only
|
|
({}, None),
|
|
# Multiple targets with different context lengths
|
|
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
|
|
# With past covariates
|
|
({"covariates": ["cov1"]}, None),
|
|
# With future covariates
|
|
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates
|
|
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates and different series order
|
|
(
|
|
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
|
|
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("freq", ["s", "min", "30min", "h", "D", "W", "ME", "QE", "YE"])
|
|
@pytest.mark.parametrize("prediction_length", [1, 4])
|
|
@pytest.mark.parametrize("validate_inputs", [True, False])
|
|
def test_predict_df_works_for_valid_inputs(
|
|
pipeline, context_setup, future_setup, freq, validate_inputs, prediction_length
|
|
):
|
|
df = create_df(**context_setup, freq=freq)
|
|
forecast_start_times = get_forecast_start_times(df, freq)
|
|
if future_setup:
|
|
series_ids = future_setup.get("series_ids", ["A", "B"])
|
|
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
|
|
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
|
|
else:
|
|
future_df = None
|
|
|
|
series_ids = context_setup.get("series_ids", ["A", "B"])
|
|
target_columns = context_setup.get("target_cols", ["target"])
|
|
n_series = len(series_ids)
|
|
n_targets = len(target_columns)
|
|
result = pipeline.predict_df(
|
|
df,
|
|
future_df=future_df,
|
|
target=target_columns,
|
|
prediction_length=prediction_length,
|
|
validate_inputs=validate_inputs,
|
|
)
|
|
|
|
expected_rows = n_series * n_targets * prediction_length
|
|
assert len(result) == expected_rows
|
|
assert "item_id" in result.columns and np.all(
|
|
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
|
|
)
|
|
assert "target_name" in result.columns and np.all(
|
|
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
|
|
)
|
|
assert "timestamp" in result.columns and np.all(
|
|
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
|
|
)
|
|
assert "predictions" in result.columns
|
|
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"context_data, error_match",
|
|
[
|
|
# Missing timestamp column
|
|
({"item_id": ["A"], "target": [1.0]}, "df does not contain all"),
|
|
# Insufficient data points
|
|
({"item_id": ["A"], "timestamp": ["2023-01-01"], "target": [1.0]}, "must have at least 3 data"),
|
|
],
|
|
)
|
|
def test_predict_df_df_validation_errors(pipeline, context_data, error_match):
|
|
df = pd.DataFrame(context_data)
|
|
|
|
with pytest.raises(ValueError, match=error_match):
|
|
pipeline.predict_df(df)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"future_data, error_match",
|
|
[
|
|
# Missing timestamp column
|
|
({"item_id": ["A"], "cov1": [1.0]}, "future_df does not contain all"),
|
|
# target in future_df
|
|
(
|
|
{"item_id": ["A"], "timestamp": ["2023-01-01"], "cov1": [1.0], "target": [1.0]},
|
|
"future_df cannot contain target",
|
|
),
|
|
# Extra columns in future_df
|
|
(
|
|
{"item_id": ["A"], "timestamp": ["2023-01-01"], "cov1": [1.0], "cov2": [1.0]},
|
|
"future_df cannot contain columns not present",
|
|
),
|
|
],
|
|
)
|
|
def test_predict_df_future_df_validation_errors(pipeline, future_data, error_match):
|
|
df = create_df(series_ids=["A", "B"], covariates=["cov1"], freq="h")
|
|
future_df = pd.DataFrame(future_data)
|
|
with pytest.raises(ValueError, match=error_match):
|
|
pipeline.predict_df(df, future_df=future_df)
|
|
|
|
|
|
@pytest.mark.parametrize("validate_inputs", [True, False])
|
|
def test_predict_df_with_non_uniform_timestamps_raises_error(pipeline, validate_inputs):
|
|
df = create_df()
|
|
# Make timestamps non-uniform for series A (first series)
|
|
df.loc[df["item_id"] == "A", "timestamp"] = [
|
|
"2023-01-01",
|
|
"2023-01-02",
|
|
"2023-01-04",
|
|
"2023-01-05",
|
|
"2023-01-06",
|
|
"2023-01-07",
|
|
"2023-01-08",
|
|
"2023-01-09",
|
|
"2023-01-10",
|
|
"2023-01-11",
|
|
]
|
|
|
|
with pytest.raises((ValueError, AssertionError), match="not infer frequency"):
|
|
pipeline.predict_df(df, validate_inputs=validate_inputs)
|
|
|
|
|
|
def test_predict_df_with_inconsistent_frequencies_raises_error(pipeline):
|
|
df = pd.DataFrame(
|
|
{
|
|
"item_id": ["A", "A", "A", "A", "A", "B", "B", "B", "B", "B"],
|
|
"timestamp": [
|
|
"2023-01-01",
|
|
"2023-01-02",
|
|
"2023-01-03",
|
|
"2023-01-04",
|
|
"2023-01-05",
|
|
"2023-01-01",
|
|
"2023-02-01",
|
|
"2023-03-01",
|
|
"2023-04-01",
|
|
"2023-05-01",
|
|
],
|
|
"target": [1.0] * 10,
|
|
}
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="same frequency"):
|
|
pipeline.predict_df(df)
|
|
|
|
|
|
def test_predict_df_with_future_df_missing_series_raises_error(pipeline):
|
|
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
|
|
future_df = create_future_df(
|
|
get_forecast_start_times(df), series_ids=["A"], covariates=["cov1"]
|
|
) # Missing Bs=["cov1"]) # Missing B
|
|
|
|
with pytest.raises(ValueError, match="same time series IDs"):
|
|
pipeline.predict_df(df, future_df=future_df)
|
|
|
|
|
|
def test_predict_df_with_future_df_with_different_freq_raises_error(pipeline):
|
|
df = create_df(series_ids=["A", "B"], covariates=["cov1"], freq="h")
|
|
future_df = create_future_df(
|
|
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 3], covariates=["cov1"], freq="D"
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="future_df timestamps do not match"):
|
|
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
|
|
|
|
|
|
def test_predict_df_with_future_df_with_different_lengths_raises_error(pipeline):
|
|
df = create_df(series_ids=["A", "B"], covariates=["cov1"])
|
|
future_df = create_future_df(
|
|
get_forecast_start_times(df), series_ids=["A", "B"], n_points=[3, 7], covariates=["cov1"]
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="future_df must contain prediction"):
|
|
pipeline.predict_df(df, future_df=future_df, prediction_length=3)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"context_setup, future_setup",
|
|
[
|
|
# Targets only
|
|
({}, None),
|
|
# Multiple targets with different context lengths
|
|
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
|
|
# With past covariates
|
|
({"covariates": ["cov1"]}, None),
|
|
# With future covariates
|
|
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates
|
|
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates and different series order
|
|
(
|
|
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
|
|
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("prediction_length", [1, 4])
|
|
def test_predict_df_outputs_different_results_with_cross_learning_enabled(
|
|
pipeline, context_setup, future_setup, prediction_length
|
|
):
|
|
freq = "h"
|
|
df = create_df(**context_setup, freq=freq)
|
|
forecast_start_times = get_forecast_start_times(df, freq)
|
|
if future_setup:
|
|
series_ids = future_setup.get("series_ids", ["A", "B"])
|
|
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
|
|
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
|
|
else:
|
|
future_df = None
|
|
|
|
series_ids = context_setup.get("series_ids", ["A", "B"])
|
|
target_columns = context_setup.get("target_cols", ["target"])
|
|
result_with_cross_learning = pipeline.predict_df(
|
|
df,
|
|
future_df=future_df,
|
|
target=target_columns,
|
|
prediction_length=prediction_length,
|
|
cross_learning=True,
|
|
)
|
|
result_without_cross_learning = pipeline.predict_df(
|
|
df,
|
|
future_df=future_df,
|
|
target=target_columns,
|
|
prediction_length=prediction_length,
|
|
cross_learning=False,
|
|
)
|
|
|
|
assert not np.array_equal(result_with_cross_learning["predictions"], result_without_cross_learning["predictions"])
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length, expected_output_shapes",
|
|
[
|
|
# Homogenous univariate task
|
|
(torch.rand(4, 1, 16), 7, [(1, DEFAULT_MODEL_NUM_QUANTILES, 7)] * 4),
|
|
# Homogenous multivariate task
|
|
(torch.rand(4, 3, 37), 27, [(3, DEFAULT_MODEL_NUM_QUANTILES, 27)] * 4),
|
|
# Heterogenous tasks with different history lengths
|
|
(
|
|
[torch.rand(100), torch.rand(2, 150), torch.rand(120)],
|
|
68,
|
|
[
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
(2, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 68),
|
|
],
|
|
),
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(99),
|
|
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
|
|
),
|
|
# Heterogenous list of dicts with different mix of tasks
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(1000),
|
|
"past_covariates": {"temperature": torch.rand(1000), "precipitation": torch.rand(1000)},
|
|
"future_covariates": {"temperature": torch.rand(200)},
|
|
},
|
|
{"target": torch.rand(2, 150), "past_covariates": {"wind_speed": torch.rand(150)}},
|
|
{
|
|
"target": np.random.rand(150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{
|
|
"target": np.random.rand(3, 150),
|
|
"past_covariates": {
|
|
"numeric_covariate_1": np.random.rand(150),
|
|
"numeric_covariate_2": np.random.rand(150),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=150),
|
|
},
|
|
"future_covariates": {
|
|
"numeric_covariate_1": np.random.rand(200),
|
|
"cat_covariate": np.random.choice(["A", "B", "C", "D", "E"], size=200),
|
|
},
|
|
},
|
|
{"target": torch.rand(1, 150)},
|
|
],
|
|
200,
|
|
[
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(2, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(3, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
(1, DEFAULT_MODEL_NUM_QUANTILES, 200),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("finetune_mode", ["full", "lora"])
|
|
def test_when_input_is_valid_then_pipeline_can_be_finetuned(
|
|
pipeline, inputs, prediction_length, expected_output_shapes, finetune_mode
|
|
):
|
|
# Get outputs before fine-tuning
|
|
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
ft_pipeline = pipeline.fit(
|
|
inputs,
|
|
prediction_length=prediction_length,
|
|
num_steps=5,
|
|
min_past=1,
|
|
batch_size=32,
|
|
finetune_mode=finetune_mode,
|
|
)
|
|
# Get outputs from fine-tuned pipeline
|
|
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
|
|
# Get outputs from original pipeline after fine-tuning
|
|
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
|
|
# Check output shapes are correct and output is different from the pretrained model outputs
|
|
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
|
|
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
|
|
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
|
|
):
|
|
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
|
|
assert torch.allclose(orig_out_before, orig_out_after)
|
|
assert not torch.allclose(orig_out_before, finetuned_out)
|
|
assert not torch.isnan(finetuned_out).any()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length, expected_output_shapes",
|
|
[
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(99),
|
|
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
|
|
)
|
|
],
|
|
)
|
|
def test_pipeline_can_be_finetuned_with_validation(pipeline, inputs, prediction_length, expected_output_shapes):
|
|
# Get outputs before fine-tuning
|
|
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
ft_pipeline = pipeline.fit(
|
|
inputs,
|
|
prediction_length=prediction_length,
|
|
validation_inputs=inputs,
|
|
num_steps=20,
|
|
min_past=1,
|
|
eval_steps=10,
|
|
logging_steps=10,
|
|
batch_size=32,
|
|
)
|
|
# Get outputs from fine-tuned pipeline
|
|
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
|
|
# Get outputs from original pipeline after fine-tuning
|
|
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
|
|
# Check output shapes are correct and output is different from the pretrained model outputs
|
|
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
|
|
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
|
|
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
|
|
):
|
|
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
|
|
assert torch.allclose(orig_out_before, orig_out_after)
|
|
assert not torch.allclose(orig_out_before, finetuned_out)
|
|
assert not torch.isnan(finetuned_out).any()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length, expected_output_shapes",
|
|
[
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(99),
|
|
"past_covariates": {"feat_1": torch.rand(99), "feat_2": torch.rand(99)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
[(1, DEFAULT_MODEL_NUM_QUANTILES, 15)] * 3,
|
|
)
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("ft_future_values", [None, np.array([]), torch.zeros(0)])
|
|
def test_pipeline_can_be_finetuned_with_empty_future_covariates(
|
|
pipeline, inputs, prediction_length, expected_output_shapes, ft_future_values
|
|
):
|
|
# Get outputs before fine-tuning
|
|
orig_outputs_before = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
# Replace future covariates with ft_future_values
|
|
ft_inputs = deepcopy(inputs)
|
|
for idx, task in enumerate(inputs):
|
|
for key in task["future_covariates"]:
|
|
ft_inputs[idx]["future_covariates"][key] = ft_future_values
|
|
|
|
ft_pipeline = pipeline.fit(
|
|
ft_inputs,
|
|
prediction_length=prediction_length,
|
|
validation_inputs=ft_inputs,
|
|
num_steps=20,
|
|
min_past=1,
|
|
eval_steps=10,
|
|
logging_steps=10,
|
|
batch_size=32,
|
|
)
|
|
# Get outputs from fine-tuned pipeline
|
|
ft_outputs = ft_pipeline.predict(inputs, prediction_length=prediction_length)
|
|
# Get outputs from original pipeline after fine-tuning
|
|
orig_outputs_after = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
|
|
# Check output shapes are correct and output is different from the pretrained model outputs
|
|
assert isinstance(ft_outputs, list) and len(ft_outputs) == len(expected_output_shapes)
|
|
for orig_out_before, finetuned_out, orig_out_after, expected_shape in zip(
|
|
orig_outputs_before, ft_outputs, orig_outputs_after, expected_output_shapes
|
|
):
|
|
validate_tensor(finetuned_out, expected_shape, dtype=torch.float32)
|
|
assert torch.allclose(orig_out_before, orig_out_after)
|
|
assert not torch.allclose(orig_out_before, finetuned_out)
|
|
assert not torch.isnan(finetuned_out).any()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs, prediction_length",
|
|
[
|
|
# Homogenous univariate task
|
|
(torch.rand(4, 1, 16), 10),
|
|
# Homogenous multivariate task
|
|
(torch.rand(4, 3, 37), 27),
|
|
# Homogenous list of dicts with target, past-only and known future covariates
|
|
(
|
|
[
|
|
{
|
|
"target": torch.rand(10),
|
|
"past_covariates": {"feat_1": torch.rand(10), "feat_2": torch.rand(10)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(29),
|
|
"past_covariates": {"feat_1": torch.rand(29), "feat_2": torch.rand(29)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
{
|
|
"target": torch.rand(17),
|
|
"past_covariates": {"feat_1": torch.rand(17), "feat_2": torch.rand(17)},
|
|
"future_covariates": {"feat_1": torch.rand(15)},
|
|
},
|
|
],
|
|
15,
|
|
),
|
|
],
|
|
)
|
|
def test_when_input_time_series_are_too_short_then_finetuning_raises_error(pipeline, inputs, prediction_length):
|
|
with pytest.raises(ValueError, match="The dataset is empty after filtering"):
|
|
pipeline.fit(
|
|
inputs, prediction_length=prediction_length, num_steps=5, min_past=prediction_length, batch_size=32
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"context_setup, future_setup",
|
|
[
|
|
# Targets only
|
|
({}, None),
|
|
# Multiple targets with different context lengths
|
|
({"target_cols": ["sales", "revenue", "profit"], "n_points": [10, 17]}, None),
|
|
# With past covariates
|
|
({"covariates": ["cov1"]}, None),
|
|
# With future covariates
|
|
({"covariates": ["cov1"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates
|
|
({"covariates": ["cov1", "cov2"]}, {"covariates": ["cov1"]}),
|
|
# With past-only and future covariates and different series order
|
|
(
|
|
{"series_ids": ["B", "C", "A", "Z"], "n_points": [10, 20, 100, 256], "covariates": ["cov1", "cov2"]},
|
|
{"series_ids": ["B", "C", "A", "Z"], "covariates": ["cov1"]},
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("freq", ["h", "D", "ME"])
|
|
def test_two_step_finetuning_with_df_input_works(pipeline, context_setup, future_setup, freq):
|
|
prediction_length = 3
|
|
df = create_df(**context_setup, freq=freq)
|
|
forecast_start_times = get_forecast_start_times(df, freq)
|
|
if future_setup:
|
|
series_ids = future_setup.get("series_ids", ["A", "B"])
|
|
future_setup_with_n_points = {**future_setup, "n_points": [prediction_length] * len(series_ids)}
|
|
future_df = create_future_df(forecast_start_times, **future_setup_with_n_points, freq=freq)
|
|
else:
|
|
future_df = None
|
|
|
|
series_ids = context_setup.get("series_ids", ["A", "B"])
|
|
target_columns = context_setup.get("target_cols", ["target"])
|
|
n_series = len(series_ids)
|
|
n_targets = len(target_columns)
|
|
|
|
# Get predictions from the pretrained model
|
|
orig_result_before = pipeline.predict_df(
|
|
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
|
|
)
|
|
|
|
# Convert df inputs to list of dicts inputs expected by finetune
|
|
inputs, _, _ = convert_df_input_to_list_of_dicts_input(
|
|
df,
|
|
future_df=future_df,
|
|
id_column="item_id",
|
|
timestamp_column="timestamp",
|
|
target_columns=target_columns,
|
|
prediction_length=prediction_length,
|
|
)
|
|
# Finetune the model
|
|
ft_pipeline = pipeline.fit(inputs, prediction_length=prediction_length, num_steps=5, min_past=1, batch_size=32)
|
|
# Predict with fine-tuned model
|
|
result = ft_pipeline.predict_df(
|
|
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
|
|
)
|
|
# Get predictions from the original pipeline again
|
|
orig_result_after = pipeline.predict_df(
|
|
df, future_df=future_df, target=target_columns, prediction_length=prediction_length
|
|
)
|
|
|
|
# Check predictions from the fine-tuned model are valid
|
|
expected_rows = n_series * n_targets * prediction_length
|
|
assert len(result) == expected_rows
|
|
assert "item_id" in result.columns and np.all(
|
|
result["item_id"].to_numpy() == np.array(series_ids).repeat(n_targets * prediction_length)
|
|
)
|
|
assert "target_name" in result.columns and np.all(
|
|
result["target_name"].to_numpy() == np.tile(np.array(target_columns).repeat(prediction_length), n_series)
|
|
)
|
|
assert "timestamp" in result.columns and np.all(
|
|
result.groupby("item_id")["timestamp"].min().to_numpy() == pd.to_datetime(forecast_start_times).to_numpy()
|
|
)
|
|
assert "predictions" in result.columns
|
|
assert all(str(q) in result.columns for q in [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9])
|
|
|
|
# Check predictions from the original pipeline are the same before and after fine-tuning
|
|
assert np.allclose(orig_result_before["predictions"].to_numpy(), orig_result_after["predictions"].to_numpy())
|
|
|
|
# Check predictions from the fine-tuned model are different from the original predictions
|
|
assert not np.allclose(orig_result_before["predictions"].to_numpy(), result["predictions"].to_numpy())
|
|
|
|
|
|
def test_when_predict_df_called_with_timeout_callback_then_timeout_error_is_raised(pipeline):
|
|
num_series = 1000
|
|
large_df = create_df(series_ids=[j for j in range(num_series)], n_points=[2048] * num_series)
|
|
with pytest.raises(TimeoutError, match="time limit exceeded"):
|
|
pipeline.predict_df(
|
|
large_df,
|
|
prediction_length=48,
|
|
after_batch=timeout_callback(0.1),
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("attn_implementation", ["eager", "sdpa"])
|
|
def test_pipeline_works_with_different_attention_implementations(attn_implementation):
|
|
"""Test that the pipeline works with different attention implementations."""
|
|
# Load the dummy model
|
|
model_path = Path(__file__).parent / "dummy-chronos2-model"
|
|
|
|
# Load with specified attention implementation
|
|
pipeline = BaseChronosPipeline.from_pretrained(
|
|
model_path, device_map="cpu", attn_implementation=attn_implementation
|
|
)
|
|
|
|
# Verify the config has the correct attention implementation
|
|
assert pipeline.model.config._attn_implementation == attn_implementation
|
|
|
|
# Test prediction with simple input
|
|
inputs = torch.rand(2, 1, 16)
|
|
prediction_length = 7
|
|
|
|
outputs = pipeline.predict(inputs, prediction_length=prediction_length)
|
|
|
|
# Check outputs are valid
|
|
assert isinstance(outputs, list) and len(outputs) == 2
|
|
for out in outputs:
|
|
validate_tensor(out, (1, DEFAULT_MODEL_NUM_QUANTILES, 7), dtype=torch.float32)
|
|
|
|
|
|
@pytest.mark.parametrize("attn_implementation", ["eager", "sdpa"])
|
|
@pytest.mark.parametrize("output_attentions", [False, True])
|
|
def test_attention_implementations_with_output_attentions(attn_implementation, output_attentions):
|
|
"""Test that attention implementations handle output_attentions correctly."""
|
|
# Create config with specified attention implementation
|
|
config = Chronos2CoreConfig(
|
|
d_model=128,
|
|
d_kv=32,
|
|
num_heads=4,
|
|
dropout_rate=0.1,
|
|
attn_implementation=attn_implementation,
|
|
)
|
|
|
|
# Create MHA layer
|
|
mha = MHA(config, use_rope=True)
|
|
mha.eval()
|
|
|
|
# Create dummy inputs
|
|
batch_size = 2
|
|
seq_len = 10
|
|
hidden_states = torch.randn(batch_size, seq_len, config.d_model)
|
|
position_ids = torch.arange(seq_len).unsqueeze(0).expand(batch_size, -1)
|
|
mask = torch.zeros(batch_size, config.num_heads, seq_len, seq_len)
|
|
|
|
# Test forward pass
|
|
output = mha(
|
|
hidden_states=hidden_states,
|
|
mask=mask,
|
|
position_ids=position_ids,
|
|
output_attentions=output_attentions,
|
|
)
|
|
|
|
# Check output shape
|
|
assert output.hidden_states.shape == (batch_size, seq_len, config.d_model)
|
|
|
|
# Check attention weights - should only be returned when output_attentions=True
|
|
if output_attentions:
|
|
assert output.attn_weights is not None
|
|
assert output.attn_weights.shape == (batch_size, config.num_heads, seq_len, seq_len)
|
|
else:
|
|
# SDPA doesn't return weights
|
|
if attn_implementation == "sdpa":
|
|
assert output.attn_weights is None
|
|
|
|
|
|
def test_eager_and_sdpa_produce_identical_outputs(pipeline):
|
|
"""Test that eager and SDPA implementations produce identical outputs on full pipeline."""
|
|
# Reload pipeline with SDPA
|
|
model_path = Path(__file__).parent / "dummy-chronos2-model"
|
|
pipeline_sdpa = BaseChronosPipeline.from_pretrained(
|
|
model_path, device_map="cpu", attn_implementation="sdpa", torch_dtype=torch.float32
|
|
)
|
|
|
|
# Note: the original pipeline fixture uses default attn_implementation which should be sdpa
|
|
# Force eager for comparison
|
|
pipeline_eager = BaseChronosPipeline.from_pretrained(
|
|
model_path, device_map="cpu", attn_implementation="eager", torch_dtype=torch.float32
|
|
)
|
|
|
|
# Test 1: Simple univariate input
|
|
inputs_simple = torch.rand(2, 1, 16)
|
|
prediction_length = 7
|
|
|
|
with torch.no_grad():
|
|
outputs_eager = pipeline_eager.predict(inputs_simple, prediction_length=prediction_length)
|
|
outputs_sdpa = pipeline_sdpa.predict(inputs_simple, prediction_length=prediction_length)
|
|
|
|
# Verify outputs match exactly
|
|
assert len(outputs_eager) == len(outputs_sdpa)
|
|
for out_eager, out_sdpa in zip(outputs_eager, outputs_sdpa):
|
|
# Should match exactly or very close (numerical precision)
|
|
assert torch.allclose(out_eager, out_sdpa, atol=1e-5, rtol=1e-4)
|
|
|
|
# Test 2: Multivariate inputs with covariates to test group attention
|
|
inputs_grouped = [
|
|
{
|
|
"target": np.random.randn(2, 36),
|
|
"past_covariates": {
|
|
"temperature": np.random.randn(36),
|
|
"weather_type": np.random.choice(["sunny", "cloudy", "rainy"], size=36),
|
|
},
|
|
"future_covariates": {
|
|
"temperature": np.random.randn(prediction_length),
|
|
"weather_type": np.random.choice(["sunny", "cloudy", "rainy"], size=prediction_length),
|
|
},
|
|
}
|
|
for _ in range(5)
|
|
]
|
|
|
|
with torch.no_grad():
|
|
outputs_eager_grouped = pipeline_eager.predict(inputs_grouped, prediction_length=prediction_length)
|
|
outputs_sdpa_grouped = pipeline_sdpa.predict(inputs_grouped, prediction_length=prediction_length)
|
|
|
|
# Verify outputs match for grouped inputs
|
|
assert len(outputs_eager_grouped) == len(outputs_sdpa_grouped)
|
|
for out_eager, out_sdpa in zip(outputs_eager_grouped, outputs_sdpa_grouped):
|
|
# Should match exactly or very close (numerical precision)
|
|
assert torch.allclose(out_eager, out_sdpa, atol=1e-5, rtol=1e-4)
|
|
|
|
|
|
def test_pipeline_can_be_finetuned_with_preprocessed_hf_dataset(pipeline):
|
|
"""Test that fine-tuning works with preprocessed inputs from a HuggingFace Dataset."""
|
|
from chronos.chronos2.dataset import prepare_inputs
|
|
|
|
prediction_length = 8
|
|
raw_inputs = [{"target": torch.rand(20)}, {"target": torch.rand(25)}, {"target": torch.rand(30)}]
|
|
|
|
# Preprocess and convert to HF Dataset (simulating Arrow-based lazy loading)
|
|
prepared_tasks = prepare_inputs(raw_inputs, prediction_length=prediction_length, min_past=1, mode="train")
|
|
hf_dataset = datasets.Dataset.from_list(prepared_tasks).with_format("torch")
|
|
|
|
# Fine-tune with preprocessed inputs
|
|
ft_pipeline = pipeline.fit(
|
|
hf_dataset, prediction_length=prediction_length, num_steps=5, min_past=1, batch_size=32, convert_inputs=False
|
|
)
|
|
|
|
# Verify fine-tuned model can predict
|
|
ft_outputs = ft_pipeline.predict(raw_inputs, prediction_length=prediction_length)
|
|
assert len(ft_outputs) == len(raw_inputs)
|
|
for ft_out in ft_outputs:
|
|
assert ft_out.shape == (1, DEFAULT_MODEL_NUM_QUANTILES, prediction_length)
|
|
assert not torch.isnan(ft_out).any()
|