diff --git a/src/chronos/df_utils.py b/src/chronos/df_utils.py index e17584f..e63ec4f 100644 --- a/src/chronos/df_utils.py +++ b/src/chronos/df_utils.py @@ -4,6 +4,7 @@ # Authors: Abdul Fatir Ansari +import warnings from typing import TYPE_CHECKING import numpy as np @@ -62,7 +63,7 @@ def validate_df_inputs( prediction_length: int, id_column: str = "item_id", timestamp_column: str = "timestamp", -) -> tuple["pd.DataFrame", "pd.DataFrame | None", "pd.Timedelta", list[int], list[int] | None, np.ndarray]: +) -> tuple["pd.DataFrame", "pd.DataFrame | None", str, list[int], np.ndarray]: """ Validates and prepares dataframe inputs @@ -95,7 +96,6 @@ def validate_df_inputs( - Validated and sorted future dataframe (if provided) - Inferred frequency of the time series - List of series lengths from input dataframe - - List of series lengths from future dataframe (if provided) - Original order of time series IDs Raises @@ -147,7 +147,7 @@ def validate_df_inputs( # Get series lengths series_lengths = df[id_column].value_counts(sort=False).to_list() - def validate_freq(timestamps: pd.Series, series_id: str): + def validate_freq(timestamps: pd.DatetimeIndex, series_id: str): freq = pd.infer_freq(timestamps) if not freq: raise ValueError(f"Could not infer frequency for series {series_id}") @@ -156,16 +156,15 @@ def validate_df_inputs( # Validate each series all_freqs = [] start_idx = 0 + timestamp_index = pd.DatetimeIndex(df[timestamp_column]) for length in series_lengths: if length < 3: - series_id = df.iloc[start_idx][id_column] + series_id = df[id_column].iloc[start_idx] raise ValueError( f"Every time series must have at least 3 data points, found {length=} for series {series_id}" ) - - series_data = df.iloc[start_idx : start_idx + length] - timestamps = series_data[timestamp_column] - series_id = series_data.iloc[0][id_column] + timestamps = timestamp_index[start_idx : start_idx + length] + series_id = df[id_column].iloc[start_idx] all_freqs.append(validate_freq(timestamps, series_id)) start_idx += length @@ -190,10 +189,10 @@ def validate_df_inputs( # Validate future series lengths match prediction_length future_start_idx = 0 + future_timestamps_index = pd.DatetimeIndex(future_df[timestamp_column]) for future_length in future_series_lengths: - future_series_data = future_df.iloc[future_start_idx : future_start_idx + future_length] - future_timestamps = future_series_data[timestamp_column] - future_series_id = future_series_data.iloc[0][id_column] + future_timestamps = future_timestamps_index[future_start_idx : future_start_idx + future_length] + future_series_id = future_df[id_column].iloc[future_start_idx] if future_length != prediction_length: raise ValueError( f"Future covariates all time series must have length {prediction_length}, got {future_length} for series {future_series_id}" @@ -206,7 +205,7 @@ def validate_df_inputs( assert len(series_lengths) == len(future_series_lengths) - return df, future_df, inferred_freq, series_lengths, future_series_lengths, original_order + return df, future_df, inferred_freq, series_lengths, original_order def convert_df_input_to_list_of_dicts_input( @@ -252,7 +251,7 @@ def convert_df_input_to_list_of_dicts_input( import pandas as pd - df, future_df, freq, series_lengths, future_series_lengths, original_order = validate_df_inputs( + df, future_df, freq, series_lengths, original_order = validate_df_inputs( df, future_df=future_df, id_column=id_column, @@ -264,50 +263,53 @@ def convert_df_input_to_list_of_dicts_input( # Convert to list of dicts format inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = [] prediction_timestamps: dict[str, pd.DatetimeIndex] = {} - start_idx: int = 0 - future_start_idx: int = 0 - for i, length in enumerate(series_lengths): - series_data = df.iloc[start_idx : start_idx + length] - # Extract target(s) - target_data = series_data[target_columns].to_numpy().T # Shape: (n_targets, history_length) - task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_data} + indptr = np.concatenate([[0], np.cumsum(series_lengths)]).astype("int64") + target_array = df[target_columns].to_numpy().T # Shape: (n_targets, len(df)) + last_ts = pd.DatetimeIndex(df[timestamp_column].iloc[indptr[1:] - 1]) # Shape: (n_series,) + offset = pd.tseries.frequencies.to_offset(freq) + with warnings.catch_warnings(): + # Silence PerformanceWarning for non-vectorized offsets https://github.com/pandas-dev/pandas/blob/95624ca2e99b0/pandas/core/arrays/datetimes.py#L822 + warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) + # Generate all prediction timestamps at once by stacking offsets into shape (n_series * prediction_length) + prediction_timestamps_array = pd.DatetimeIndex( + np.dstack([last_ts + step * offset for step in range(1, prediction_length + 1)]).ravel() + ) - # Generate future timestamps - series_id = series_data.iloc[0][id_column] - last_timestamp = series_data[timestamp_column].iloc[-1] - future_ts = pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:] - prediction_timestamps[series_id] = future_ts + past_covariates_dict = { + col: df[col].to_numpy() for col in df.columns if col not in [id_column, timestamp_column] + target_columns + } + if future_df is not None: + future_covariates_dict = { + col: future_df[col].to_numpy() for col in future_df.columns if col not in [id_column, timestamp_column] + } + + for i in range(len(series_lengths)): + start_idx, end_idx = indptr[i], indptr[i + 1] + future_start_idx, future_end_idx = i * prediction_length, (i + 1) * prediction_length + + series_id = df[id_column].iloc[start_idx] + prediction_timestamps[series_id] = prediction_timestamps_array[future_start_idx:future_end_idx] + task: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_array[:, start_idx:end_idx]} # Handle covariates if present - covariate_cols = [ - col for col in series_data.columns if col not in [id_column, timestamp_column] + target_columns - ] - - if covariate_cols: - past_covariates = {col: series_data[col].to_numpy() for col in covariate_cols} - task["past_covariates"] = past_covariates + if len(past_covariates_dict) > 0: + task["past_covariates"] = {col: values[start_idx:end_idx] for col, values in past_covariates_dict.items()} # Handle future covariates if future_df is not None: - assert future_series_lengths is not None - future_length = future_series_lengths[i] - future_data = future_df.iloc[future_start_idx : future_start_idx + future_length] - assert future_data[timestamp_column].iloc[0] == future_ts[0], ( + first_future_timestamp = future_df[timestamp_column].iloc[future_start_idx] + assert first_future_timestamp == prediction_timestamps[series_id][0], ( f"the first timestamp in future_df must be the first forecast timestamp, found mismatch " - f"({future_data[timestamp_column].iloc[0]} != {future_ts[0]}) in series {series_id}" + f"({first_future_timestamp} != {prediction_timestamps[series_id][0]}) in series {series_id}" ) - if len(future_data) > 0: - future_covariates = { - col: future_data[col].to_numpy() for col in covariate_cols if col in future_data.columns + if len(future_covariates_dict) > 0: + task["future_covariates"] = { + col: values[future_start_idx:future_end_idx] for col, values in future_covariates_dict.items() } - if future_covariates: - task["future_covariates"] = future_covariates - future_start_idx += future_length inputs.append(task) - start_idx += length assert len(inputs) == len(series_lengths)