Remove all references of task_

This commit is contained in:
Oleksandr Shchur 2026-02-19 09:33:19 +00:00
parent 3a1a44e252
commit 115d6138dc

View file

@ -48,13 +48,13 @@ def left_pad_and_cat_2D(tensors: list[torch.Tensor]) -> torch.Tensor:
def validate_and_prepare_single_dict_input(
task: Mapping[str, TensorOrArray | Mapping[str, TensorOrArray]], idx: int, prediction_length: int
raw_input: Mapping[str, TensorOrArray | Mapping[str, TensorOrArray]], idx: int, prediction_length: int
) -> PreparedInput:
"""Validates and prepares a single dictionary input for Chronos2Model.
Parameters
----------
task
raw_input
A dictionary representing a time series that contains:
- `target` (required): a 1-d or 2-d `torch.Tensor` or `np.ndarray` of shape (history_length,) or (n_variates, history_length).
Forecasts will be generated for items in `target`.
@ -65,27 +65,27 @@ def validate_and_prepare_single_dict_input(
covariates and values must be 1-d `torch.Tensor` or `np.ndarray` with length equal to the `prediction_length`. All keys in
`future_covariates` must be a subset of the keys in `past_covariates`.
idx
Index of this task in the list of tasks, used for error messages
Index of this input in the list of inputs, used for error messages
prediction_length
Number of future time steps to predict, used to validate future covariates
Returns
------
A tuple containing:
- task_context_tensor: Concatenated tensor of target and past covariates of shape (group_size, history_length),
the first `task_n_targets` items along the first axis contain the target variables and the remaining items contain past-only covariates
A PreparedInput containing:
- context: Concatenated tensor of target and past covariates of shape (group_size, history_length),
the first `n_targets` items along the first axis contain the target variables and the remaining items contain past-only covariates
and past values of known future covariates.
- task_future_covariates_tensor: Tensor of future covariates of shape (group_size, prediction_length). The last `task_n_future_covariates`
- future_covariates: Tensor of future covariates of shape (group_size, prediction_length). The last `n_future_covariates`
items along the first axis contain future covariates. All the remaining elements corresponding to target and past-only covariates are NaNs.
- task_n_targets: Number of target variables
- task_n_covariates: Total number of covariates (sum of past-only and known future covariates)
- task_n_future_covariates: Number of known future covariates
- n_targets: Number of target variables
- n_covariates: Total number of covariates (sum of past-only and known future covariates)
- n_future_covariates: Number of known future covariates
"""
allowed_keys = {"target", "past_covariates", "future_covariates"}
# validate keys
keys = set(task.keys())
keys = set(raw_input.keys())
if not keys.issubset(allowed_keys):
raise ValueError(
f"Found invalid keys in element at index {idx}. Allowed keys are {allowed_keys}, but found {keys}"
@ -94,58 +94,58 @@ def validate_and_prepare_single_dict_input(
raise ValueError(f"Element at index {idx} does not contain the required key 'target'")
# validate target
task_target = task["target"]
if isinstance(task_target, np.ndarray):
task_target = torch.from_numpy(task_target)
assert isinstance(task_target, torch.Tensor)
if task_target.ndim > 2:
target = raw_input["target"]
if isinstance(target, np.ndarray):
target = torch.from_numpy(target)
assert isinstance(target, torch.Tensor)
if target.ndim > 2:
raise ValueError(
"When the input is a list of dicts, the `target` should either be 1-d with shape (history_length,) "
f" or 2-d with shape (n_variates, history_length). Found element at index {idx} with shape {tuple(task_target.shape)}."
f" or 2-d with shape (n_variates, history_length). Found element at index {idx} with shape {tuple(target.shape)}."
)
history_length = task_target.shape[-1]
task_target = task_target.view(-1, history_length)
history_length = target.shape[-1]
target = target.view(-1, history_length)
# validate past_covariates
cat_encoders: dict = {}
task_past_covariates = task.get("past_covariates", {})
if not isinstance(task_past_covariates, dict):
past_covariates = raw_input.get("past_covariates", {})
if not isinstance(past_covariates, dict):
raise ValueError(
f"Found invalid type for `past_covariates` in element at index {idx}. "
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(task_past_covariates)}'
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(past_covariates)}'
)
# gather keys and ensure known-future keys come last to match downstream assumptions
task_covariates_keys = sorted(task_past_covariates.keys())
covariates_keys = sorted(past_covariates.keys())
task_future_covariates = task.get("future_covariates", {})
if not isinstance(task_future_covariates, dict):
future_covariates = raw_input.get("future_covariates", {})
if not isinstance(future_covariates, dict):
raise ValueError(
f"Found invalid type for `future_covariates` in element at index {idx}. "
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(task_future_covariates)}'
f'Expected dict with {{"feat_1": tensor_1, "feat_2": tensor_2, ...}}, but found {type(future_covariates)}'
)
task_future_covariates_keys = sorted(task_future_covariates.keys())
if not set(task_future_covariates_keys).issubset(task_covariates_keys):
future_covariates_keys = sorted(future_covariates.keys())
if not set(future_covariates_keys).issubset(covariates_keys):
raise ValueError(
f"Expected keys in `future_covariates` to be a subset of `past_covariates` {task_covariates_keys}, "
f"but found {task_future_covariates_keys} in element at index {idx}"
f"Expected keys in `future_covariates` to be a subset of `past_covariates` {covariates_keys}, "
f"but found {future_covariates_keys} in element at index {idx}"
)
# create ordered keys: past-only first, then known-future (so known-future are the last rows)
task_past_only_keys = [k for k in task_covariates_keys if k not in task_future_covariates_keys] # past_only_keys
task_ordered_covariate_keys = task_past_only_keys + task_future_covariates_keys
past_only_keys = [k for k in covariates_keys if k not in future_covariates_keys]
ordered_covariate_keys = past_only_keys + future_covariates_keys
task_past_covariates_list: list[torch.Tensor] = []
for key in task_ordered_covariate_keys:
tensor = task_past_covariates[key]
past_covariates_list: list[torch.Tensor] = []
for key in ordered_covariate_keys:
tensor = past_covariates[key]
if isinstance(tensor, np.ndarray):
# apply encoding to categorical variates
if not np.issubdtype(tensor.dtype, np.number):
# target encoding, if the target is 1-d
if task_target.shape[0] == 1:
if target.shape[0] == 1:
cat_encoder = TargetEncoder(target_type="continuous", smooth=1.0)
X = tensor.astype(str).reshape(-1, 1)
y = task_target.view(-1).numpy()
y = target.view(-1).numpy()
mask = np.isfinite(y)
X = X[mask]
y = y[mask]
@ -163,18 +163,18 @@ def validate_and_prepare_single_dict_input(
f"Individual `past_covariates` must be 1-d with length equal to the length of `target` (= {history_length}), "
f"found: {key} with shape {tuple(tensor.shape)} in element at index {idx}"
)
task_past_covariates_list.append(tensor)
task_past_covariates_tensor = (
torch.stack(task_past_covariates_list, dim=0)
if task_past_covariates_list
else torch.zeros((0, history_length), device=task_target.device)
past_covariates_list.append(tensor)
past_covariates_tensor = (
torch.stack(past_covariates_list, dim=0)
if past_covariates_list
else torch.zeros((0, history_length), device=target.device)
)
# validate future_covariates (build rows in the same task_ordered_covariate_keys order)
task_future_covariates_list: list[torch.Tensor] = []
for key in task_ordered_covariate_keys:
# validate future_covariates (build rows in the same ordered_covariate_keys order)
future_covariates_list: list[torch.Tensor] = []
for key in ordered_covariate_keys:
# future values of past-only covariates are filled with NaNs
tensor = task_future_covariates.get(key, torch.full((prediction_length,), fill_value=torch.nan))
tensor = future_covariates.get(key, torch.full((prediction_length,), fill_value=torch.nan))
if isinstance(tensor, np.ndarray):
# apply encoding to categorical variates
if not np.issubdtype(tensor.dtype, np.number):
@ -187,32 +187,32 @@ def validate_and_prepare_single_dict_input(
f"Individual `future_covariates` must be 1-d with length equal to the {prediction_length=}, "
f"found: {key} with shape {tuple(tensor.shape)} in element at index {idx}"
)
task_future_covariates_list.append(tensor)
task_future_covariates_tensor = (
torch.stack(task_future_covariates_list, dim=0)
if task_future_covariates_list
else torch.zeros((0, prediction_length), device=task_target.device)
future_covariates_list.append(tensor)
future_covariates_tensor = (
torch.stack(future_covariates_list, dim=0)
if future_covariates_list
else torch.zeros((0, prediction_length), device=target.device)
)
# future values of target series are filled with NaNs
task_future_covariates_target_padding = torch.full(
(task_target.shape[0], prediction_length), fill_value=torch.nan, device=task_target.device
future_covariates_target_padding = torch.full(
(target.shape[0], prediction_length), fill_value=torch.nan, device=target.device
)
task_context_tensor = torch.cat([task_target, task_past_covariates_tensor], dim=0).to(dtype=torch.float32)
task_future_covariates_tensor = torch.cat(
[task_future_covariates_target_padding, task_future_covariates_tensor], dim=0
context_tensor = torch.cat([target, past_covariates_tensor], dim=0).to(dtype=torch.float32)
future_covariates_tensor = torch.cat(
[future_covariates_target_padding, future_covariates_tensor], dim=0
).to(dtype=torch.float32)
task_n_targets = task_target.shape[0]
task_n_covariates = task_past_covariates_tensor.shape[0]
n_targets = target.shape[0]
n_covariates = past_covariates_tensor.shape[0]
# number of known-future covariates
task_n_future_covariates = len(task_future_covariates_keys)
n_future_covariates = len(future_covariates_keys)
return PreparedInput(
context=task_context_tensor,
future_covariates=task_future_covariates_tensor,
n_targets=task_n_targets,
n_covariates=task_n_covariates,
n_future_covariates=task_n_future_covariates,
context=context_tensor,
future_covariates=future_covariates_tensor,
n_targets=n_targets,
n_covariates=n_covariates,
n_future_covariates=n_future_covariates,
)
@ -543,15 +543,15 @@ class Chronos2Dataset(IterableDataset):
self.mode = mode
def _construct_slice(self, input_idx: int) -> tuple[torch.Tensor, torch.Tensor | None, torch.Tensor, int]:
input = self.inputs[input_idx]
task_past_tensor = input["context"].clone() # shape: (task_n_targets + task_n_covariates, history_length)
task_future_tensor = input["future_covariates"].clone()
task_n_targets = int(input["n_targets"])
task_n_covariates = int(input["n_covariates"])
task_n_future_covariates = int(input["n_future_covariates"])
task_n_past_only_covariates = task_n_covariates - task_n_future_covariates
prepared = self.inputs[input_idx]
past_tensor = prepared["context"].clone() # shape: (n_targets + n_covariates, history_length)
future_tensor = prepared["future_covariates"].clone()
n_targets = int(prepared["n_targets"])
n_covariates = int(prepared["n_covariates"])
n_future_covariates = int(prepared["n_future_covariates"])
n_past_only_covariates = n_covariates - n_future_covariates
full_length = task_past_tensor.shape[-1]
full_length = past_tensor.shape[-1]
if self.mode == DatasetMode.TRAIN:
# slice a random subsequence from the full series
@ -565,74 +565,74 @@ class Chronos2Dataset(IterableDataset):
if slice_idx >= self.context_length:
# slice series, if it is longer than context_length
task_context = task_past_tensor[:, slice_idx - self.context_length : slice_idx]
context = past_tensor[:, slice_idx - self.context_length : slice_idx]
else:
task_context = task_past_tensor[:, :slice_idx]
context = past_tensor[:, :slice_idx]
# In the TEST mode, we have no target available and the task_future_covariates can be directly used
# In the TRAIN and VALIDATION modes, the target and task_future_covariates need to be constructed from
# the task_context_tensor by slicing the appropriate indices which we do below
# In the TEST mode, we have no target available and the future_covariates can be directly used
# In the TRAIN and VALIDATION modes, the target and future_covariates need to be constructed from
# the context_tensor by slicing the appropriate indices which we do below
if self.mode in [DatasetMode.TRAIN, DatasetMode.VALIDATION]:
# the first task_n_targets elements in task_context_tensor are the targets
task_future_target = task_past_tensor[:, slice_idx : slice_idx + self.prediction_length].clone()
# the first n_targets elements in context_tensor are the targets
future_target = past_tensor[:, slice_idx : slice_idx + self.prediction_length].clone()
# mask out all rows corresponding to covariates
task_future_target[task_n_targets:] = torch.nan
future_target[n_targets:] = torch.nan
if task_n_future_covariates > 0:
# the last task_n_future_covariates elements in task_context_tensor are the known covariates
task_future_covariates = task_past_tensor[
-task_n_future_covariates:, slice_idx : slice_idx + self.prediction_length
if n_future_covariates > 0:
# the last n_future_covariates elements in context_tensor are the known covariates
future_covariates = past_tensor[
-n_future_covariates:, slice_idx : slice_idx + self.prediction_length
]
else:
# zero-length tensor for easy concatenation later
task_future_covariates = torch.zeros((0, self.prediction_length))
future_covariates = torch.zeros((0, self.prediction_length))
# the leading task_n_targets + task_n_past_only_covariates elements are masked because the target(s)
# the leading n_targets + n_past_only_covariates elements are masked because the target(s)
# and past-only covariates are not known into the future
task_future_covariates_padding = torch.full(
(task_n_targets + task_n_past_only_covariates, self.prediction_length),
future_covariates_padding = torch.full(
(n_targets + n_past_only_covariates, self.prediction_length),
fill_value=torch.nan,
)
task_future_covariates = torch.cat([task_future_covariates_padding, task_future_covariates], dim=0)
future_covariates = torch.cat([future_covariates_padding, future_covariates], dim=0)
else:
task_future_target = None
task_future_covariates = task_future_tensor
future_target = None
future_covariates = future_tensor
# task_context: (task_n_targets + task_n_covariates, min(context_length, history_length))
# task_future_target: (task_n_targets + task_n_covariates, prediction_length), the future values of known future covariates
# context: (n_targets + n_covariates, min(context_length, history_length))
# future_target: (n_targets + n_covariates, prediction_length), the future values of known future covariates
# are ignored during loss computation
# task_future_covariates: (task_n_targets + task_n_past_only_covariates + task_n_future_covariates, prediction_length),
# future_covariates: (n_targets + n_past_only_covariates + n_future_covariates, prediction_length),
# the entries corresponding to targets and past-only covariates are NaNs
return task_context, task_future_target, task_future_covariates, task_n_targets
return context, future_target, future_covariates, n_targets
def _build_batch(self, input_indices: list[int]) -> dict[str, torch.Tensor | int | list[tuple[int, int]] | None]:
"""Build a batch from given input indices."""
batch_context_tensor_list = []
batch_future_target_tensor_list = []
batch_future_covariates_tensor_list = []
batch_context_list = []
batch_future_target_list = []
batch_future_covariates_list = []
batch_group_ids_list = []
target_idx_ranges: list[tuple[int, int]] = []
target_start_idx = 0
for group_id, input_idx in enumerate(input_indices):
task_context, task_future_target, task_future_covariates, task_n_targets = self._construct_slice(input_idx)
context, future_target, future_covariates, n_targets = self._construct_slice(input_idx)
group_size = task_context.shape[0]
task_group_ids = torch.full((group_size,), fill_value=group_id)
batch_context_tensor_list.append(task_context)
batch_future_target_tensor_list.append(task_future_target)
batch_future_covariates_tensor_list.append(task_future_covariates)
batch_group_ids_list.append(task_group_ids)
target_idx_ranges.append((target_start_idx, target_start_idx + task_n_targets))
group_size = context.shape[0]
group_ids = torch.full((group_size,), fill_value=group_id)
batch_context_list.append(context)
batch_future_target_list.append(future_target)
batch_future_covariates_list.append(future_covariates)
batch_group_ids_list.append(group_ids)
target_idx_ranges.append((target_start_idx, target_start_idx + n_targets))
target_start_idx += group_size
return {
"context": left_pad_and_cat_2D(batch_context_tensor_list),
"context": left_pad_and_cat_2D(batch_context_list),
"future_target": None
if self.mode == DatasetMode.TEST
else torch.cat(cast(list[torch.Tensor], batch_future_target_tensor_list), dim=0),
"future_covariates": torch.cat(batch_future_covariates_tensor_list, dim=0),
else torch.cat(cast(list[torch.Tensor], batch_future_target_list), dim=0),
"future_covariates": torch.cat(batch_future_covariates_list, dim=0),
"group_ids": torch.cat(batch_group_ids_list, dim=0),
"num_output_patches": self.num_output_patches,
"target_idx_ranges": target_idx_ranges,