Move input preparation outside of Chronos2Dataset

This commit is contained in:
Oleksandr Shchur 2026-02-18 13:57:29 +00:00
parent 1f099eb265
commit e55233d0c3
3 changed files with 346 additions and 84 deletions

View file

@ -15,6 +15,7 @@ from torch.utils.data import IterableDataset
if TYPE_CHECKING:
import datasets
import fev
from chronos.chronos2.preprocessing import PreparedTask, RawTask
TensorOrArray: TypeAlias = torch.Tensor | np.ndarray
@ -383,49 +384,63 @@ class Chronos2Dataset(IterableDataset):
Arguments
----------
inputs
Time series data. Must be a list of dictionaries where each dictionary may have the following keys.
- `target` (required): a 1-d or 2-d `torch.Tensor` or `np.ndarray` of shape (history_length,) or (n_variates, history_length).
Forecasts will be generated for items in `target`.
- `past_covariates` (optional): a dict of past-only covariates or past values of known future covariates. The keys of the dict
must be names of the covariates and values must be 1-d `torch.Tensor` or `np.ndarray` with length equal to the `history_length`
of `target`.
- `future_covariates` (optional): a dict of future values of known future covariates. The keys of the dict must be names of the
covariates and values must be 1-d `torch.Tensor` or `np.ndarray` with length equal to the `prediction_length`. All keys in
`future_covariates` must be a subset of the keys in `past_covariates`.
Note: when the mode is set to TRAIN, the values inside `future_covariates` are not technically used for training the model;
however, this key is used to infer which covariates are known into the future. Therefore, if your task contains known future covariates,
make sure that this key exists in `inputs`. The values of individual future covariates may be set to `None` or an empty array.
Time series data. Can be either:
1. Raw inputs (when `convert_inputs=True`, default): A sequence of dictionaries where each
dictionary may have the following keys:
- `target` (required): a 1-d or 2-d `torch.Tensor` or `np.ndarray` of shape (history_length,)
or (n_variates, history_length).
- `past_covariates` (optional): a dict of past-only covariates or past values of known future
covariates.
- `future_covariates` (optional): a dict of future values of known future covariates.
2. Pre-processed inputs (when `convert_inputs=False`): A sequence of prepared tasks, each with keys:
- `context`: 2-d array of shape (n_variates, history_length)
- `future_covariates`: 2-d array of shape (n_variates, prediction_length)
- `n_targets`, `n_covariates`, `n_future_covariates`: int metadata
Use `chronos.chronos2.preprocessing.prepare_tasks()` to create pre-processed inputs.
context_length
The maximum context length used for training or inference
prediction_length
The prediction horizon
batch_size
The batch size for training the model. Note that the batch size here means the number of time series, including target(s) and
covariates, that are input into the model. If your data has multiple target and/or covariates, the effective number of time series
tasks in a batch will be lower than this value.
The batch size for training the model. Note that the batch size here means the number of time series,
including target(s) and covariates, that are input into the model.
output_patch_size
The output patch size of the model. This is used to compute the number of patches needed to cover `prediction_length`
The output patch size of the model. This is used to compute the number of patches needed to cover
`prediction_length`
min_past
The minimum number of time steps the context must have during training. All time series shorter than `min_past + prediction_length`
are filtered out, by default 1
The minimum number of time steps the context must have during training. All time series shorter than
`min_past + prediction_length` are filtered out, by default 1
mode
`DatasetMode` governing whether to generate training, validation or test samples, by default "train"
convert_inputs
If True (default), preprocess raw inputs. If False, inputs are expected to be already preprocessed.
"""
def __init__(
self,
inputs: Sequence[Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]],
inputs: "Sequence[PreparedTask | RawTask]",
context_length: int,
prediction_length: int,
batch_size: int,
output_patch_size: int,
min_past: int = 1,
mode: str | DatasetMode = DatasetMode.TRAIN,
convert_inputs: bool = True,
) -> None:
super().__init__()
assert mode in {DatasetMode.TRAIN, DatasetMode.VALIDATION, DatasetMode.TEST}, f"Invalid mode: {mode}"
self.tasks = Chronos2Dataset._prepare_tasks(inputs, prediction_length, min_past, mode)
if convert_inputs:
from chronos.chronos2.preprocessing import prepare_tasks
self.tasks = prepare_tasks(inputs, prediction_length, min_past, mode)
else:
from chronos.chronos2.preprocessing import validate_prepared_schema
validate_prepared_schema(inputs[0])
self.tasks = inputs
self.context_length = context_length
self.prediction_length = prediction_length
self.batch_size = batch_size
@ -433,51 +448,20 @@ class Chronos2Dataset(IterableDataset):
self.min_past = min_past
self.mode = mode
@staticmethod
def _prepare_tasks(
inputs: Sequence[Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]],
prediction_length: int,
min_past: int,
mode: str | DatasetMode,
):
tasks = []
for idx, raw_task in enumerate(inputs):
if mode != DatasetMode.TEST:
raw_future_covariates = raw_task.get("future_covariates", {})
raw_future_covariates = cast(dict[str, TensorOrArray | None], raw_future_covariates)
if raw_future_covariates:
fixed_future_covariates = {}
for key, value in raw_future_covariates.items():
fixed_future_covariates[key] = (
np.full(prediction_length, np.nan) if value is None or len(value) == 0 else value
)
raw_task = {**raw_task, "future_covariates": fixed_future_covariates}
raw_task = cast(dict[str, TensorOrArray | Mapping[str, TensorOrArray]], raw_task)
# convert to a format compatible with model's forward
task = validate_and_prepare_single_dict_task(raw_task, idx, prediction_length)
if mode != DatasetMode.TEST and task[0].shape[-1] < min_past + prediction_length:
# filter tasks based on min_past + prediction_length
continue
tasks.append(task)
if len(tasks) == 0:
raise ValueError(
"The dataset is empty after filtering based on the length of the time series (length >= min_past + prediction_length). "
"Please provide longer time series or reduce `min_past` or `prediction_length`. "
)
return tasks
def _construct_slice(self, task_idx: int) -> tuple[torch.Tensor, torch.Tensor | None, torch.Tensor, int]:
(
task_past_tensor, # shape: (task_n_targets + task_n_covariates, history_length)
task_future_tensor,
task_n_targets,
task_n_covariates,
task_n_future_covariates,
) = self.tasks[task_idx]
task_past_tensor, task_future_tensor = task_past_tensor.clone(), task_future_tensor.clone()
task = self.tasks[task_idx]
# Convert numpy arrays to torch tensors if needed
context = task["context"]
future_cov = task["future_covariates"]
if isinstance(context, np.ndarray):
context = torch.from_numpy(context)
if isinstance(future_cov, np.ndarray):
future_cov = torch.from_numpy(future_cov)
task_past_tensor = context.clone().to(torch.float32)
task_future_tensor = future_cov.clone().to(torch.float32)
task_n_targets = task["n_targets"]
task_n_covariates = task["n_covariates"]
task_n_future_covariates = task["n_future_covariates"]
task_n_past_only_covariates = task_n_covariates - task_n_future_covariates
full_length = task_past_tensor.shape[-1]
@ -575,7 +559,7 @@ class Chronos2Dataset(IterableDataset):
while current_batch_size < self.batch_size:
task_idx = np.random.randint(len(self.tasks))
task_indices.append(task_idx)
current_batch_size += self.tasks[task_idx][0].shape[0]
current_batch_size += self.tasks[task_idx]["context"].shape[0]
yield self._build_batch(task_indices)
@ -587,7 +571,7 @@ class Chronos2Dataset(IterableDataset):
while task_idx < len(self.tasks) and current_batch_size < self.batch_size:
task_indices.append(task_idx)
current_batch_size += self.tasks[task_idx][0].shape[0]
current_batch_size += self.tasks[task_idx]["context"].shape[0]
task_idx += 1
yield self._build_batch(task_indices)
@ -631,7 +615,12 @@ class Chronos2Dataset(IterableDataset):
min_past: int = 1,
mode: str | DatasetMode = DatasetMode.TRAIN,
) -> "Chronos2Dataset":
"""Convert from different input formats to a Chronos2Dataset."""
"""Convert from different input formats to a Chronos2Dataset.
This method handles various input formats (tensors, list of tensors, list of dicts)
and creates a dataset with preprocessing.
"""
# Convert various input formats to list of dicts
if isinstance(inputs, (torch.Tensor, np.ndarray)):
inputs = convert_tensor_input_to_list_of_dicts_input(inputs)
elif isinstance(inputs, list) and all([isinstance(x, (torch.Tensor, np.ndarray)) for x in inputs]):
@ -652,4 +641,5 @@ class Chronos2Dataset(IterableDataset):
output_patch_size=output_patch_size,
min_past=min_past,
mode=mode,
convert_inputs=True,
)

View file

@ -115,6 +115,7 @@ class Chronos2Pipeline(BaseChronosPipeline):
callbacks: list["TrainerCallback"] | None = None,
remove_printer_callback: bool = False,
disable_data_parallel: bool = True,
convert_inputs: bool = True,
**extra_trainer_kwargs,
) -> "Chronos2Pipeline":
"""
@ -161,6 +162,12 @@ class Chronos2Pipeline(BaseChronosPipeline):
If True, all instances of `PrinterCallback` are removed from callbacks
disable_data_parallel
If True, ensures that DataParallel is disabled and training happens on a single GPU
convert_inputs
If True (default), preprocess raw inputs (convert tensors, encode categoricals, validate).
If False, inputs are expected to be already preprocessed (e.g., loaded from Arrow file
using `chronos.chronos2.preprocessing.ArrowTaskSequence` or prepared manually with
`chronos.chronos2.preprocessing.prepare_tasks`). This allows for efficient training on
large datasets that don't fit in memory.
**extra_trainer_kwargs
Extra kwargs are directly forwarded to `TrainingArguments`
@ -229,15 +236,27 @@ class Chronos2Pipeline(BaseChronosPipeline):
if min_past is None:
min_past = prediction_length
train_dataset = Chronos2Dataset.convert_inputs(
inputs=inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
min_past=min_past,
mode=DatasetMode.TRAIN,
)
if convert_inputs:
train_dataset = Chronos2Dataset.convert_inputs(
inputs=inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
min_past=min_past,
mode=DatasetMode.TRAIN,
)
else:
train_dataset = Chronos2Dataset(
inputs=inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
min_past=min_past,
mode=DatasetMode.TRAIN,
convert_inputs=False,
)
if output_dir is None:
output_dir = Path("chronos-2-finetuned") / time.strftime("%Y-%m-%d_%H-%M-%S")
@ -291,14 +310,26 @@ class Chronos2Pipeline(BaseChronosPipeline):
callbacks = callbacks or []
if validation_inputs is not None:
# construct validation dataset
eval_dataset = Chronos2Dataset.convert_inputs(
inputs=validation_inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
mode=DatasetMode.VALIDATION,
)
if convert_inputs:
eval_dataset = Chronos2Dataset.convert_inputs(
inputs=validation_inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
mode=DatasetMode.VALIDATION,
)
else:
eval_dataset = Chronos2Dataset(
inputs=validation_inputs,
context_length=context_length,
prediction_length=prediction_length,
batch_size=batch_size,
output_patch_size=self.model_output_patch_size,
min_past=min_past,
mode=DatasetMode.VALIDATION,
convert_inputs=False,
)
# set validation parameters
training_kwargs["save_strategy"] = "steps"

View file

@ -0,0 +1,241 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Preprocessing utilities for Chronos-2 datasets.
This module provides functions to prepare raw time series data for training.
"""
from typing import Any, Iterable, Mapping, Sequence, TypedDict, cast
import numpy as np
import torch
from sklearn.preprocessing import OrdinalEncoder, TargetEncoder
from chronos.chronos2.dataset import DatasetMode
TensorOrArray = torch.Tensor | np.ndarray
# Type alias for raw input format
RawTask = Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]
class PreparedTask(TypedDict):
"""A preprocessed time series task ready for model training/inference."""
context: np.ndarray # (n_variates, history_length), float32
future_covariates: np.ndarray # (n_variates, prediction_length), float32
n_targets: int
n_covariates: int
n_future_covariates: int
def prepare_single_task(
task: RawTask,
idx: int,
prediction_length: int,
) -> PreparedTask:
"""Validate and prepare a single time series task.
This is the core preprocessing logic extracted from Chronos2Dataset.
"""
allowed_keys = {"target", "past_covariates", "future_covariates"}
keys = set(task.keys())
if not keys.issubset(allowed_keys):
raise ValueError(
f"Found invalid keys in element at index {idx}. "
f"Allowed keys are {allowed_keys}, but found {keys}"
)
if "target" not in keys:
raise ValueError(f"Element at index {idx} does not contain the required key 'target'")
# Process target
task_target = task["target"]
if isinstance(task_target, torch.Tensor):
# Convert to float32 first for numpy compatibility (handles bfloat16, etc.)
task_target = task_target.to(torch.float32).numpy()
task_target = np.asarray(task_target, dtype=np.float32)
if task_target.ndim > 2:
raise ValueError(
"When the input is a list of dicts, the `target` should either be 1-d with shape (history_length,) "
f" or 2-d with shape (n_variates, history_length). Found element at index {idx} with shape {tuple(task_target.shape)}."
)
history_length = task_target.shape[-1]
task_target = task_target.reshape(-1, history_length)
# Process past_covariates
cat_encoders: dict = {}
task_past_covariates = task.get("past_covariates", {})
if not isinstance(task_past_covariates, dict):
raise ValueError(
f"Found invalid type for `past_covariates` in element at index {idx}. "
f"Expected dict, but found {type(task_past_covariates)}"
)
task_covariates_keys = sorted(task_past_covariates.keys())
task_future_covariates = task.get("future_covariates", {})
if not isinstance(task_future_covariates, dict):
raise ValueError(
f"Found invalid type for `future_covariates` in element at index {idx}. "
f"Expected dict, but found {type(task_future_covariates)}"
)
task_future_covariates_keys = sorted(task_future_covariates.keys())
if not set(task_future_covariates_keys).issubset(task_covariates_keys):
raise ValueError(
f"Expected keys in `future_covariates` to be a subset of `past_covariates` "
f"{task_covariates_keys}, but found {task_future_covariates_keys} in element at index {idx}"
)
# Ordered: past-only first, then known-future
task_past_only_keys = [k for k in task_covariates_keys if k not in task_future_covariates_keys]
task_ordered_covariate_keys = task_past_only_keys + task_future_covariates_keys
# Process past covariates
task_past_covariates_list: list[np.ndarray] = []
for key in task_ordered_covariate_keys:
tensor = task_past_covariates[key]
if isinstance(tensor, torch.Tensor):
tensor = tensor.to(torch.float32).numpy()
tensor = np.asarray(tensor)
# Encode categorical variates
if not np.issubdtype(tensor.dtype, np.number):
if task_target.shape[0] == 1:
cat_encoder = TargetEncoder(target_type="continuous", smooth=1.0)
X = tensor.astype(str).reshape(-1, 1)
y = task_target.reshape(-1)
mask = np.isfinite(y)
cat_encoder.fit(X[mask], y[mask])
else:
cat_encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan)
cat_encoder.fit(tensor.astype(str).reshape(-1, 1))
tensor = cat_encoder.transform(tensor.astype(str).reshape(-1, 1)).reshape(tensor.shape)
cat_encoders[key] = cat_encoder
if tensor.ndim != 1 or len(tensor) != history_length:
raise ValueError(
f"Individual `past_covariates` must be 1-d with length {history_length}, "
f"found: {key} with shape {tensor.shape} in element at index {idx}"
)
task_past_covariates_list.append(tensor)
if task_past_covariates_list:
task_past_covariates_array = np.stack(task_past_covariates_list, axis=0)
else:
task_past_covariates_array = np.zeros((0, history_length), dtype=np.float32)
# Process future covariates
task_future_covariates_list: list[np.ndarray] = []
for key in task_ordered_covariate_keys:
tensor = task_future_covariates.get(key, np.full(prediction_length, np.nan))
if tensor is None:
tensor = np.full(prediction_length, np.nan)
if isinstance(tensor, torch.Tensor):
tensor = tensor.to(torch.float32).numpy()
tensor = np.asarray(tensor)
if not np.issubdtype(tensor.dtype, np.number):
cat_encoder = cat_encoders[key]
tensor = cat_encoder.transform(tensor.astype(str).reshape(-1, 1)).reshape(tensor.shape)
if tensor.ndim != 1 or len(tensor) != prediction_length:
raise ValueError(
f"Individual `future_covariates` must be 1-d with length {prediction_length}, "
f"found: {key} with shape {tensor.shape} in element at index {idx}"
)
task_future_covariates_list.append(tensor)
if task_future_covariates_list:
task_future_covariates_array = np.stack(task_future_covariates_list, axis=0)
else:
task_future_covariates_array = np.zeros((0, prediction_length), dtype=np.float32)
task_future_covariates_target_padding = np.full(
(task_target.shape[0], prediction_length), np.nan, dtype=np.float32
)
context = np.concatenate([task_target, task_past_covariates_array], axis=0).astype(np.float32)
future_covariates = np.concatenate(
[task_future_covariates_target_padding, task_future_covariates_array], axis=0
).astype(np.float32)
return PreparedTask(
context=context,
future_covariates=future_covariates,
n_targets=task_target.shape[0],
n_covariates=task_past_covariates_array.shape[0],
n_future_covariates=len(task_future_covariates_keys),
)
def prepare_tasks(
raw_tasks: Iterable[RawTask],
prediction_length: int,
min_past: int = 1,
mode: DatasetMode | str = DatasetMode.TRAIN,
) -> list[PreparedTask]:
"""Prepare multiple time series tasks for training/inference."""
if isinstance(mode, str):
mode = DatasetMode(mode)
tasks: list[PreparedTask] = []
for idx, raw_task in enumerate(raw_tasks):
# For non-TEST modes, fix future_covariates
if mode != DatasetMode.TEST:
raw_future_covariates = raw_task.get("future_covariates", {})
if raw_future_covariates:
raw_future_covariates = cast(dict[str, TensorOrArray | None], raw_future_covariates)
fixed_future_covariates = {}
for key, value in raw_future_covariates.items():
fixed_future_covariates[key] = (
np.full(prediction_length, np.nan) if value is None or len(value) == 0 else value
)
raw_task = {**raw_task, "future_covariates": fixed_future_covariates}
raw_task = cast(dict[str, TensorOrArray | Mapping[str, TensorOrArray]], raw_task)
prepared = prepare_single_task(raw_task, idx, prediction_length)
# Filter by minimum length
if mode != DatasetMode.TEST and prepared["context"].shape[-1] < min_past + prediction_length:
continue
tasks.append(prepared)
if len(tasks) == 0:
raise ValueError(
"The dataset is empty after filtering based on length. "
"Provide longer time series or reduce min_past/prediction_length."
)
return tasks
def validate_prepared_schema(task: Any) -> None:
"""Validate that a task matches the PreparedTask schema."""
if not isinstance(task, Mapping):
raise TypeError(
f"Expected task to be a dict-like, got {type(task).__name__}. "
"Set convert_inputs=True to preprocess raw inputs."
)
required_keys = {"context", "future_covariates", "n_targets", "n_covariates", "n_future_covariates"}
missing = required_keys - set(task.keys())
if missing:
raise TypeError(
f"Task is missing required keys: {missing}. "
"Set convert_inputs=True to preprocess raw inputs."
)
context = task["context"]
if not isinstance(context, (np.ndarray, torch.Tensor)) or context.ndim != 2:
raise TypeError(
f"Expected 'context' to be 2-d array, got {type(context).__name__} "
f"with shape {getattr(context, 'shape', 'N/A')}. "
"Set convert_inputs=True to preprocess raw inputs."
)