From e55233d0c3761b0c3658ee5ca8f25261a750c7de Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 18 Feb 2026 13:57:29 +0000 Subject: [PATCH] Move input preparation outside of Chronos2Dataset --- src/chronos/chronos2/dataset.py | 124 ++++++------- src/chronos/chronos2/pipeline.py | 65 +++++-- src/chronos/chronos2/preprocessing.py | 241 ++++++++++++++++++++++++++ 3 files changed, 346 insertions(+), 84 deletions(-) create mode 100644 src/chronos/chronos2/preprocessing.py diff --git a/src/chronos/chronos2/dataset.py b/src/chronos/chronos2/dataset.py index cb75571..b680b84 100644 --- a/src/chronos/chronos2/dataset.py +++ b/src/chronos/chronos2/dataset.py @@ -15,6 +15,7 @@ from torch.utils.data import IterableDataset if TYPE_CHECKING: import datasets import fev + from chronos.chronos2.preprocessing import PreparedTask, RawTask TensorOrArray: TypeAlias = torch.Tensor | np.ndarray @@ -383,49 +384,63 @@ class Chronos2Dataset(IterableDataset): Arguments ---------- inputs - Time series data. Must be a list of dictionaries where each dictionary may have the following keys. - - `target` (required): a 1-d or 2-d `torch.Tensor` or `np.ndarray` of shape (history_length,) or (n_variates, history_length). - Forecasts will be generated for items in `target`. - - `past_covariates` (optional): a dict of past-only covariates or past values of known future covariates. The keys of the dict - must be names of the covariates and values must be 1-d `torch.Tensor` or `np.ndarray` with length equal to the `history_length` - of `target`. - - `future_covariates` (optional): a dict of future values of known future covariates. The keys of the dict must be names of the - covariates and values must be 1-d `torch.Tensor` or `np.ndarray` with length equal to the `prediction_length`. All keys in - `future_covariates` must be a subset of the keys in `past_covariates`. - Note: when the mode is set to TRAIN, the values inside `future_covariates` are not technically used for training the model; - however, this key is used to infer which covariates are known into the future. Therefore, if your task contains known future covariates, - make sure that this key exists in `inputs`. The values of individual future covariates may be set to `None` or an empty array. + Time series data. Can be either: + + 1. Raw inputs (when `convert_inputs=True`, default): A sequence of dictionaries where each + dictionary may have the following keys: + - `target` (required): a 1-d or 2-d `torch.Tensor` or `np.ndarray` of shape (history_length,) + or (n_variates, history_length). + - `past_covariates` (optional): a dict of past-only covariates or past values of known future + covariates. + - `future_covariates` (optional): a dict of future values of known future covariates. + + 2. Pre-processed inputs (when `convert_inputs=False`): A sequence of prepared tasks, each with keys: + - `context`: 2-d array of shape (n_variates, history_length) + - `future_covariates`: 2-d array of shape (n_variates, prediction_length) + - `n_targets`, `n_covariates`, `n_future_covariates`: int metadata + + Use `chronos.chronos2.preprocessing.prepare_tasks()` to create pre-processed inputs. context_length The maximum context length used for training or inference prediction_length The prediction horizon batch_size - The batch size for training the model. Note that the batch size here means the number of time series, including target(s) and - covariates, that are input into the model. If your data has multiple target and/or covariates, the effective number of time series - tasks in a batch will be lower than this value. + The batch size for training the model. Note that the batch size here means the number of time series, + including target(s) and covariates, that are input into the model. output_patch_size - The output patch size of the model. This is used to compute the number of patches needed to cover `prediction_length` + The output patch size of the model. This is used to compute the number of patches needed to cover + `prediction_length` min_past - The minimum number of time steps the context must have during training. All time series shorter than `min_past + prediction_length` - are filtered out, by default 1 + The minimum number of time steps the context must have during training. All time series shorter than + `min_past + prediction_length` are filtered out, by default 1 mode `DatasetMode` governing whether to generate training, validation or test samples, by default "train" + convert_inputs + If True (default), preprocess raw inputs. If False, inputs are expected to be already preprocessed. """ def __init__( self, - inputs: Sequence[Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]], + inputs: "Sequence[PreparedTask | RawTask]", context_length: int, prediction_length: int, batch_size: int, output_patch_size: int, min_past: int = 1, mode: str | DatasetMode = DatasetMode.TRAIN, + convert_inputs: bool = True, ) -> None: super().__init__() assert mode in {DatasetMode.TRAIN, DatasetMode.VALIDATION, DatasetMode.TEST}, f"Invalid mode: {mode}" - self.tasks = Chronos2Dataset._prepare_tasks(inputs, prediction_length, min_past, mode) + if convert_inputs: + from chronos.chronos2.preprocessing import prepare_tasks + self.tasks = prepare_tasks(inputs, prediction_length, min_past, mode) + else: + from chronos.chronos2.preprocessing import validate_prepared_schema + validate_prepared_schema(inputs[0]) + self.tasks = inputs + self.context_length = context_length self.prediction_length = prediction_length self.batch_size = batch_size @@ -433,51 +448,20 @@ class Chronos2Dataset(IterableDataset): self.min_past = min_past self.mode = mode - @staticmethod - def _prepare_tasks( - inputs: Sequence[Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]]], - prediction_length: int, - min_past: int, - mode: str | DatasetMode, - ): - tasks = [] - for idx, raw_task in enumerate(inputs): - if mode != DatasetMode.TEST: - raw_future_covariates = raw_task.get("future_covariates", {}) - raw_future_covariates = cast(dict[str, TensorOrArray | None], raw_future_covariates) - if raw_future_covariates: - fixed_future_covariates = {} - for key, value in raw_future_covariates.items(): - fixed_future_covariates[key] = ( - np.full(prediction_length, np.nan) if value is None or len(value) == 0 else value - ) - raw_task = {**raw_task, "future_covariates": fixed_future_covariates} - - raw_task = cast(dict[str, TensorOrArray | Mapping[str, TensorOrArray]], raw_task) - # convert to a format compatible with model's forward - task = validate_and_prepare_single_dict_task(raw_task, idx, prediction_length) - - if mode != DatasetMode.TEST and task[0].shape[-1] < min_past + prediction_length: - # filter tasks based on min_past + prediction_length - continue - tasks.append(task) - - if len(tasks) == 0: - raise ValueError( - "The dataset is empty after filtering based on the length of the time series (length >= min_past + prediction_length). " - "Please provide longer time series or reduce `min_past` or `prediction_length`. " - ) - return tasks - def _construct_slice(self, task_idx: int) -> tuple[torch.Tensor, torch.Tensor | None, torch.Tensor, int]: - ( - task_past_tensor, # shape: (task_n_targets + task_n_covariates, history_length) - task_future_tensor, - task_n_targets, - task_n_covariates, - task_n_future_covariates, - ) = self.tasks[task_idx] - task_past_tensor, task_future_tensor = task_past_tensor.clone(), task_future_tensor.clone() + task = self.tasks[task_idx] + # Convert numpy arrays to torch tensors if needed + context = task["context"] + future_cov = task["future_covariates"] + if isinstance(context, np.ndarray): + context = torch.from_numpy(context) + if isinstance(future_cov, np.ndarray): + future_cov = torch.from_numpy(future_cov) + task_past_tensor = context.clone().to(torch.float32) + task_future_tensor = future_cov.clone().to(torch.float32) + task_n_targets = task["n_targets"] + task_n_covariates = task["n_covariates"] + task_n_future_covariates = task["n_future_covariates"] task_n_past_only_covariates = task_n_covariates - task_n_future_covariates full_length = task_past_tensor.shape[-1] @@ -575,7 +559,7 @@ class Chronos2Dataset(IterableDataset): while current_batch_size < self.batch_size: task_idx = np.random.randint(len(self.tasks)) task_indices.append(task_idx) - current_batch_size += self.tasks[task_idx][0].shape[0] + current_batch_size += self.tasks[task_idx]["context"].shape[0] yield self._build_batch(task_indices) @@ -587,7 +571,7 @@ class Chronos2Dataset(IterableDataset): while task_idx < len(self.tasks) and current_batch_size < self.batch_size: task_indices.append(task_idx) - current_batch_size += self.tasks[task_idx][0].shape[0] + current_batch_size += self.tasks[task_idx]["context"].shape[0] task_idx += 1 yield self._build_batch(task_indices) @@ -631,7 +615,12 @@ class Chronos2Dataset(IterableDataset): min_past: int = 1, mode: str | DatasetMode = DatasetMode.TRAIN, ) -> "Chronos2Dataset": - """Convert from different input formats to a Chronos2Dataset.""" + """Convert from different input formats to a Chronos2Dataset. + + This method handles various input formats (tensors, list of tensors, list of dicts) + and creates a dataset with preprocessing. + """ + # Convert various input formats to list of dicts if isinstance(inputs, (torch.Tensor, np.ndarray)): inputs = convert_tensor_input_to_list_of_dicts_input(inputs) elif isinstance(inputs, list) and all([isinstance(x, (torch.Tensor, np.ndarray)) for x in inputs]): @@ -652,4 +641,5 @@ class Chronos2Dataset(IterableDataset): output_patch_size=output_patch_size, min_past=min_past, mode=mode, + convert_inputs=True, ) diff --git a/src/chronos/chronos2/pipeline.py b/src/chronos/chronos2/pipeline.py index c7ccbbb..51c3c98 100644 --- a/src/chronos/chronos2/pipeline.py +++ b/src/chronos/chronos2/pipeline.py @@ -115,6 +115,7 @@ class Chronos2Pipeline(BaseChronosPipeline): callbacks: list["TrainerCallback"] | None = None, remove_printer_callback: bool = False, disable_data_parallel: bool = True, + convert_inputs: bool = True, **extra_trainer_kwargs, ) -> "Chronos2Pipeline": """ @@ -161,6 +162,12 @@ class Chronos2Pipeline(BaseChronosPipeline): If True, all instances of `PrinterCallback` are removed from callbacks disable_data_parallel If True, ensures that DataParallel is disabled and training happens on a single GPU + convert_inputs + If True (default), preprocess raw inputs (convert tensors, encode categoricals, validate). + If False, inputs are expected to be already preprocessed (e.g., loaded from Arrow file + using `chronos.chronos2.preprocessing.ArrowTaskSequence` or prepared manually with + `chronos.chronos2.preprocessing.prepare_tasks`). This allows for efficient training on + large datasets that don't fit in memory. **extra_trainer_kwargs Extra kwargs are directly forwarded to `TrainingArguments` @@ -229,15 +236,27 @@ class Chronos2Pipeline(BaseChronosPipeline): if min_past is None: min_past = prediction_length - train_dataset = Chronos2Dataset.convert_inputs( - inputs=inputs, - context_length=context_length, - prediction_length=prediction_length, - batch_size=batch_size, - output_patch_size=self.model_output_patch_size, - min_past=min_past, - mode=DatasetMode.TRAIN, - ) + if convert_inputs: + train_dataset = Chronos2Dataset.convert_inputs( + inputs=inputs, + context_length=context_length, + prediction_length=prediction_length, + batch_size=batch_size, + output_patch_size=self.model_output_patch_size, + min_past=min_past, + mode=DatasetMode.TRAIN, + ) + else: + train_dataset = Chronos2Dataset( + inputs=inputs, + context_length=context_length, + prediction_length=prediction_length, + batch_size=batch_size, + output_patch_size=self.model_output_patch_size, + min_past=min_past, + mode=DatasetMode.TRAIN, + convert_inputs=False, + ) if output_dir is None: output_dir = Path("chronos-2-finetuned") / time.strftime("%Y-%m-%d_%H-%M-%S") @@ -291,14 +310,26 @@ class Chronos2Pipeline(BaseChronosPipeline): callbacks = callbacks or [] if validation_inputs is not None: # construct validation dataset - eval_dataset = Chronos2Dataset.convert_inputs( - inputs=validation_inputs, - context_length=context_length, - prediction_length=prediction_length, - batch_size=batch_size, - output_patch_size=self.model_output_patch_size, - mode=DatasetMode.VALIDATION, - ) + if convert_inputs: + eval_dataset = Chronos2Dataset.convert_inputs( + inputs=validation_inputs, + context_length=context_length, + prediction_length=prediction_length, + batch_size=batch_size, + output_patch_size=self.model_output_patch_size, + mode=DatasetMode.VALIDATION, + ) + else: + eval_dataset = Chronos2Dataset( + inputs=validation_inputs, + context_length=context_length, + prediction_length=prediction_length, + batch_size=batch_size, + output_patch_size=self.model_output_patch_size, + min_past=min_past, + mode=DatasetMode.VALIDATION, + convert_inputs=False, + ) # set validation parameters training_kwargs["save_strategy"] = "steps" diff --git a/src/chronos/chronos2/preprocessing.py b/src/chronos/chronos2/preprocessing.py new file mode 100644 index 0000000..bac33df --- /dev/null +++ b/src/chronos/chronos2/preprocessing.py @@ -0,0 +1,241 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Preprocessing utilities for Chronos-2 datasets. + +This module provides functions to prepare raw time series data for training. +""" + +from typing import Any, Iterable, Mapping, Sequence, TypedDict, cast + +import numpy as np +import torch +from sklearn.preprocessing import OrdinalEncoder, TargetEncoder + +from chronos.chronos2.dataset import DatasetMode + +TensorOrArray = torch.Tensor | np.ndarray + +# Type alias for raw input format +RawTask = Mapping[str, TensorOrArray | Mapping[str, TensorOrArray | None]] + + +class PreparedTask(TypedDict): + """A preprocessed time series task ready for model training/inference.""" + + context: np.ndarray # (n_variates, history_length), float32 + future_covariates: np.ndarray # (n_variates, prediction_length), float32 + n_targets: int + n_covariates: int + n_future_covariates: int + + +def prepare_single_task( + task: RawTask, + idx: int, + prediction_length: int, +) -> PreparedTask: + """Validate and prepare a single time series task. + + This is the core preprocessing logic extracted from Chronos2Dataset. + """ + allowed_keys = {"target", "past_covariates", "future_covariates"} + + keys = set(task.keys()) + if not keys.issubset(allowed_keys): + raise ValueError( + f"Found invalid keys in element at index {idx}. " + f"Allowed keys are {allowed_keys}, but found {keys}" + ) + if "target" not in keys: + raise ValueError(f"Element at index {idx} does not contain the required key 'target'") + + # Process target + task_target = task["target"] + if isinstance(task_target, torch.Tensor): + # Convert to float32 first for numpy compatibility (handles bfloat16, etc.) + task_target = task_target.to(torch.float32).numpy() + task_target = np.asarray(task_target, dtype=np.float32) + + if task_target.ndim > 2: + raise ValueError( + "When the input is a list of dicts, the `target` should either be 1-d with shape (history_length,) " + f" or 2-d with shape (n_variates, history_length). Found element at index {idx} with shape {tuple(task_target.shape)}." + ) + history_length = task_target.shape[-1] + task_target = task_target.reshape(-1, history_length) + + # Process past_covariates + cat_encoders: dict = {} + task_past_covariates = task.get("past_covariates", {}) + if not isinstance(task_past_covariates, dict): + raise ValueError( + f"Found invalid type for `past_covariates` in element at index {idx}. " + f"Expected dict, but found {type(task_past_covariates)}" + ) + + task_covariates_keys = sorted(task_past_covariates.keys()) + + task_future_covariates = task.get("future_covariates", {}) + if not isinstance(task_future_covariates, dict): + raise ValueError( + f"Found invalid type for `future_covariates` in element at index {idx}. " + f"Expected dict, but found {type(task_future_covariates)}" + ) + task_future_covariates_keys = sorted(task_future_covariates.keys()) + + if not set(task_future_covariates_keys).issubset(task_covariates_keys): + raise ValueError( + f"Expected keys in `future_covariates` to be a subset of `past_covariates` " + f"{task_covariates_keys}, but found {task_future_covariates_keys} in element at index {idx}" + ) + + # Ordered: past-only first, then known-future + task_past_only_keys = [k for k in task_covariates_keys if k not in task_future_covariates_keys] + task_ordered_covariate_keys = task_past_only_keys + task_future_covariates_keys + + # Process past covariates + task_past_covariates_list: list[np.ndarray] = [] + for key in task_ordered_covariate_keys: + tensor = task_past_covariates[key] + if isinstance(tensor, torch.Tensor): + tensor = tensor.to(torch.float32).numpy() + tensor = np.asarray(tensor) + + # Encode categorical variates + if not np.issubdtype(tensor.dtype, np.number): + if task_target.shape[0] == 1: + cat_encoder = TargetEncoder(target_type="continuous", smooth=1.0) + X = tensor.astype(str).reshape(-1, 1) + y = task_target.reshape(-1) + mask = np.isfinite(y) + cat_encoder.fit(X[mask], y[mask]) + else: + cat_encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan) + cat_encoder.fit(tensor.astype(str).reshape(-1, 1)) + tensor = cat_encoder.transform(tensor.astype(str).reshape(-1, 1)).reshape(tensor.shape) + cat_encoders[key] = cat_encoder + + if tensor.ndim != 1 or len(tensor) != history_length: + raise ValueError( + f"Individual `past_covariates` must be 1-d with length {history_length}, " + f"found: {key} with shape {tensor.shape} in element at index {idx}" + ) + task_past_covariates_list.append(tensor) + + if task_past_covariates_list: + task_past_covariates_array = np.stack(task_past_covariates_list, axis=0) + else: + task_past_covariates_array = np.zeros((0, history_length), dtype=np.float32) + + # Process future covariates + task_future_covariates_list: list[np.ndarray] = [] + for key in task_ordered_covariate_keys: + tensor = task_future_covariates.get(key, np.full(prediction_length, np.nan)) + if tensor is None: + tensor = np.full(prediction_length, np.nan) + if isinstance(tensor, torch.Tensor): + tensor = tensor.to(torch.float32).numpy() + tensor = np.asarray(tensor) + + if not np.issubdtype(tensor.dtype, np.number): + cat_encoder = cat_encoders[key] + tensor = cat_encoder.transform(tensor.astype(str).reshape(-1, 1)).reshape(tensor.shape) + + if tensor.ndim != 1 or len(tensor) != prediction_length: + raise ValueError( + f"Individual `future_covariates` must be 1-d with length {prediction_length}, " + f"found: {key} with shape {tensor.shape} in element at index {idx}" + ) + task_future_covariates_list.append(tensor) + + if task_future_covariates_list: + task_future_covariates_array = np.stack(task_future_covariates_list, axis=0) + else: + task_future_covariates_array = np.zeros((0, prediction_length), dtype=np.float32) + + task_future_covariates_target_padding = np.full( + (task_target.shape[0], prediction_length), np.nan, dtype=np.float32 + ) + + context = np.concatenate([task_target, task_past_covariates_array], axis=0).astype(np.float32) + future_covariates = np.concatenate( + [task_future_covariates_target_padding, task_future_covariates_array], axis=0 + ).astype(np.float32) + + return PreparedTask( + context=context, + future_covariates=future_covariates, + n_targets=task_target.shape[0], + n_covariates=task_past_covariates_array.shape[0], + n_future_covariates=len(task_future_covariates_keys), + ) + + +def prepare_tasks( + raw_tasks: Iterable[RawTask], + prediction_length: int, + min_past: int = 1, + mode: DatasetMode | str = DatasetMode.TRAIN, +) -> list[PreparedTask]: + """Prepare multiple time series tasks for training/inference.""" + if isinstance(mode, str): + mode = DatasetMode(mode) + + tasks: list[PreparedTask] = [] + + for idx, raw_task in enumerate(raw_tasks): + # For non-TEST modes, fix future_covariates + if mode != DatasetMode.TEST: + raw_future_covariates = raw_task.get("future_covariates", {}) + if raw_future_covariates: + raw_future_covariates = cast(dict[str, TensorOrArray | None], raw_future_covariates) + fixed_future_covariates = {} + for key, value in raw_future_covariates.items(): + fixed_future_covariates[key] = ( + np.full(prediction_length, np.nan) if value is None or len(value) == 0 else value + ) + raw_task = {**raw_task, "future_covariates": fixed_future_covariates} + + raw_task = cast(dict[str, TensorOrArray | Mapping[str, TensorOrArray]], raw_task) + prepared = prepare_single_task(raw_task, idx, prediction_length) + + # Filter by minimum length + if mode != DatasetMode.TEST and prepared["context"].shape[-1] < min_past + prediction_length: + continue + + tasks.append(prepared) + + if len(tasks) == 0: + raise ValueError( + "The dataset is empty after filtering based on length. " + "Provide longer time series or reduce min_past/prediction_length." + ) + + return tasks + + +def validate_prepared_schema(task: Any) -> None: + """Validate that a task matches the PreparedTask schema.""" + if not isinstance(task, Mapping): + raise TypeError( + f"Expected task to be a dict-like, got {type(task).__name__}. " + "Set convert_inputs=True to preprocess raw inputs." + ) + + required_keys = {"context", "future_covariates", "n_targets", "n_covariates", "n_future_covariates"} + missing = required_keys - set(task.keys()) + if missing: + raise TypeError( + f"Task is missing required keys: {missing}. " + "Set convert_inputs=True to preprocess raw inputs." + ) + + context = task["context"] + if not isinstance(context, (np.ndarray, torch.Tensor)) or context.ndim != 2: + raise TypeError( + f"Expected 'context' to be 2-d array, got {type(context).__name__} " + f"with shape {getattr(context, 'shape', 'N/A')}. " + "Set convert_inputs=True to preprocess raw inputs." + )